Add --uid-pool option to gnt-cluster init
[ganeti-local] / lib / jqueue.py
index 2c2a5fa..62a5f4a 100644 (file)
@@ -49,9 +49,10 @@ from ganeti import rpc
 
 
 JOBQUEUE_THREADS = 25
 
 
 JOBQUEUE_THREADS = 25
+JOBS_PER_ARCHIVE_DIRECTORY = 10000
 
 
 
 
-class CancelJob:
+class CancelJob(Exception):
   """Special exception to cancel a job.
 
   """
   """Special exception to cancel a job.
 
   """
@@ -68,7 +69,7 @@ def TimeStampNow():
 
 
 class _QueuedOpCode(object):
 
 
 class _QueuedOpCode(object):
-  """Encasulates an opcode object.
+  """Encapsulates an opcode object.
 
   @ivar log: holds the execution log and consists of tuples
   of the form C{(log_serial, timestamp, level, message)}
 
   @ivar log: holds the execution log and consists of tuples
   of the form C{(log_serial, timestamp, level, message)}
@@ -79,6 +80,10 @@ class _QueuedOpCode(object):
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
+  __slots__ = ["input", "status", "result", "log",
+               "start_timestamp", "end_timestamp",
+               "__weakref__"]
+
   def __init__(self, op):
     """Constructor for the _QuededOpCode.
 
   def __init__(self, op):
     """Constructor for the _QuededOpCode.
 
@@ -140,17 +145,21 @@ class _QueuedJob(object):
   @ivar id: the job ID
   @type ops: list
   @ivar ops: the list of _QueuedOpCode that constitute the job
   @ivar id: the job ID
   @type ops: list
   @ivar ops: the list of _QueuedOpCode that constitute the job
-  @type run_op_index: int
-  @ivar run_op_index: the currently executing opcode, or -1 if
-      we didn't yet start executing
   @type log_serial: int
   @ivar log_serial: holds the index for the next log entry
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
   @type log_serial: int
   @ivar log_serial: holds the index for the next log entry
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
+  @ivar lock_status: In-memory locking information for debugging
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
+  # pylint: disable-msg=W0212
+  __slots__ = ["queue", "id", "ops", "log_serial",
+               "received_timestamp", "start_timestamp", "end_timestamp",
+               "lock_status", "change",
+               "__weakref__"]
+
   def __init__(self, queue, job_id, ops):
     """Constructor for the _QueuedJob.
 
   def __init__(self, queue, job_id, ops):
     """Constructor for the _QueuedJob.
 
@@ -170,15 +179,24 @@ class _QueuedJob(object):
     self.queue = queue
     self.id = job_id
     self.ops = [_QueuedOpCode(op) for op in ops]
     self.queue = queue
     self.id = job_id
     self.ops = [_QueuedOpCode(op) for op in ops]
-    self.run_op_index = -1
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
 
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
 
+    # In-memory attributes
+    self.lock_status = None
+
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
+  def __repr__(self):
+    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+              "id=%s" % self.id,
+              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
+
+    return "<%s at %#x>" % (" ".join(status), id(self))
+
   @classmethod
   def Restore(cls, queue, state):
     """Restore a _QueuedJob from serialized state:
   @classmethod
   def Restore(cls, queue, state):
     """Restore a _QueuedJob from serialized state:
@@ -194,11 +212,13 @@ class _QueuedJob(object):
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     obj.id = state["id"]
     obj = _QueuedJob.__new__(cls)
     obj.queue = queue
     obj.id = state["id"]
-    obj.run_op_index = state["run_op_index"]
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
+    # In-memory attributes
+    obj.lock_status = None
+
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
@@ -222,7 +242,6 @@ class _QueuedJob(object):
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self.ops],
-      "run_op_index": self.run_op_index,
       "start_timestamp": self.start_timestamp,
       "end_timestamp": self.end_timestamp,
       "received_timestamp": self.received_timestamp,
       "start_timestamp": self.start_timestamp,
       "end_timestamp": self.end_timestamp,
       "received_timestamp": self.received_timestamp,
@@ -285,7 +304,7 @@ class _QueuedJob(object):
     """Selectively returns the log entries.
 
     @type newer_than: None or int
     """Selectively returns the log entries.
 
     @type newer_than: None or int
-    @param newer_than: if this is None, return all log enties,
+    @param newer_than: if this is None, return all log entries,
         otherwise return only the log entries with serial higher
         than this value
     @rtype: list
         otherwise return only the log entries with serial higher
         than this value
     @rtype: list
@@ -303,37 +322,112 @@ class _QueuedJob(object):
 
     return entries
 
 
     return entries
 
+  def MarkUnfinishedOps(self, status, result):
+    """Mark unfinished opcodes with a given status and result.
 
 
-class _JobQueueWorker(workerpool.BaseWorker):
-  """The actual job workers.
+    This is an utility function for marking all running or waiting to
+    be run opcodes with a given status. Opcodes which are already
+    finalised are not changed.
 
 
-  """
-  def _NotifyStart(self):
-    """Mark the opcode as running, not lock-waiting.
+    @param status: a given opcode status
+    @param result: the opcode result
 
 
-    This is called from the mcpu code as a notifier function, when the
-    LU is finally about to start the Exec() method. Of course, to have
-    end-user visible results, the opcode must be initially (before
-    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+    """
+    not_marked = True
+    for op in self.ops:
+      if op.status in constants.OPS_FINALIZED:
+        assert not_marked, "Finalized opcodes found after non-finalized ones"
+        continue
+      op.status = status
+      op.result = result
+      not_marked = False
+
+
+class _OpExecCallbacks(mcpu.OpExecCbBase):
+  def __init__(self, queue, job, op):
+    """Initializes this class.
+
+    @type queue: L{JobQueue}
+    @param queue: Job queue
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type op: L{_QueuedOpCode}
+    @param op: OpCode
 
     """
 
     """
-    assert self.queue, "Queue attribute is missing"
-    assert self.opcode, "Opcode attribute is missing"
+    assert queue, "Queue is missing"
+    assert job, "Job is missing"
+    assert op, "Opcode is missing"
+
+    self._queue = queue
+    self._job = job
+    self._op = op
 
 
-    self.queue.acquire()
+  def NotifyStart(self):
+    """Mark the opcode as running, not lock-waiting.
+
+    This is called from the mcpu code as a notifier function, when the LU is
+    finally about to start the Exec() method. Of course, to have end-user
+    visible results, the opcode must be initially (before calling into
+    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+
+    """
+    self._queue.acquire()
     try:
     try:
-      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
-                                    constants.OP_STATUS_CANCELING)
+      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+                                 constants.OP_STATUS_CANCELING)
+
+      # All locks are acquired by now
+      self._job.lock_status = None
 
       # Cancel here if we were asked to
 
       # Cancel here if we were asked to
-      if self.opcode.status == constants.OP_STATUS_CANCELING:
+      if self._op.status == constants.OP_STATUS_CANCELING:
         raise CancelJob()
 
         raise CancelJob()
 
-      self.opcode.status = constants.OP_STATUS_RUNNING
+      self._op.status = constants.OP_STATUS_RUNNING
+    finally:
+      self._queue.release()
+
+  def Feedback(self, *args):
+    """Append a log entry.
+
+    """
+    assert len(args) < 3
+
+    if len(args) == 1:
+      log_type = constants.ELOG_MESSAGE
+      log_msg = args[0]
+    else:
+      (log_type, log_msg) = args
+
+    # The time is split to make serialization easier and not lose
+    # precision.
+    timestamp = utils.SplitTime(time.time())
+
+    self._queue.acquire()
+    try:
+      self._job.log_serial += 1
+      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+
+      self._job.change.notifyAll()
     finally:
     finally:
-      self.queue.release()
+      self._queue.release()
+
+  def ReportLocks(self, msg):
+    """Write locking information to the job.
+
+    Called whenever the LU processor is waiting for a lock or has acquired one.
+
+    """
+    # Not getting the queue lock because this is a single assignment
+    self._job.lock_status = msg
+
+
+class _JobQueueWorker(workerpool.BaseWorker):
+  """The actual job workers.
 
 
-  def RunTask(self, job):
+  """
+  def RunTask(self, job): # pylint: disable-msg=W0221
     """Job executor.
 
     This functions processes a job. It is closely tied to the _QueuedJob and
     """Job executor.
 
     This functions processes a job. It is closely tied to the _QueuedJob and
@@ -343,21 +437,32 @@ class _JobQueueWorker(workerpool.BaseWorker):
     @param job: the job to be processed
 
     """
     @param job: the job to be processed
 
     """
-    logging.debug("Worker %s processing job %s",
-                  self.worker_id, job.id)
-    proc = mcpu.Processor(self.pool.queue.context)
-    self.queue = queue = job.queue
+    logging.info("Processing job %s", job.id)
+    proc = mcpu.Processor(self.pool.queue.context, job.id)
+    queue = job.queue
     try:
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
     try:
       try:
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
+          op_summary = op.input.Summary()
+          if op.status == constants.OP_STATUS_SUCCESS:
+            # this is a job that was partially completed before master
+            # daemon shutdown, so it can be expected that some opcodes
+            # are already completed successfully (if any did error
+            # out, then the whole job should have been aborted and not
+            # resubmitted for processing)
+            logging.info("Op %s/%s: opcode %s already processed, skipping",
+                         idx + 1, count, op_summary)
+            continue
           try:
           try:
-            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
+            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
+                         op_summary)
 
             queue.acquire()
             try:
 
             queue.acquire()
             try:
+              if op.status == constants.OP_STATUS_CANCELED:
+                raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
               assert op.status == constants.OP_STATUS_QUEUED
-              job.run_op_index = idx
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
@@ -369,34 +474,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
             finally:
               queue.release()
 
             finally:
               queue.release()
 
-            def _Log(*args):
-              """Append a log entry.
-
-              """
-              assert len(args) < 3
-
-              if len(args) == 1:
-                log_type = constants.ELOG_MESSAGE
-                log_msg = args[0]
-              else:
-                log_type, log_msg = args
-
-              # The time is split to make serialization easier and not lose
-              # precision.
-              timestamp = utils.SplitTime(time.time())
-
-              queue.acquire()
-              try:
-                job.log_serial += 1
-                op.log.append((job.log_serial, timestamp, log_type, log_msg))
-
-                job.change.notifyAll()
-              finally:
-                queue.release()
-
-            # Make sure not to hold lock while _Log is called
-            self.opcode = op
-            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
+            # Make sure not to hold queue lock while calling ExecOpCode
+            result = proc.ExecOpCode(input_opcode,
+                                     _OpExecCallbacks(queue, job, op))
 
             queue.acquire()
             try:
 
             queue.acquire()
             try:
@@ -407,8 +487,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
             finally:
               queue.release()
 
             finally:
               queue.release()
 
-            logging.debug("Op %s/%s: Successfully finished %s",
-                          idx + 1, count, op)
+            logging.info("Op %s/%s: Successfully finished opcode %s",
+                         idx + 1, count, op_summary)
           except CancelJob:
             # Will be handled further up
             raise
           except CancelJob:
             # Will be handled further up
             raise
@@ -417,9 +497,13 @@ class _JobQueueWorker(workerpool.BaseWorker):
             try:
               try:
                 op.status = constants.OP_STATUS_ERROR
             try:
               try:
                 op.status = constants.OP_STATUS_ERROR
-                op.result = str(err)
+                if isinstance(err, errors.GenericError):
+                  op.result = errors.EncodeException(err)
+                else:
+                  op.result = str(err)
                 op.end_timestamp = TimeStampNow()
                 op.end_timestamp = TimeStampNow()
-                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
+                logging.info("Op %s/%s: Error in opcode %s: %s",
+                             idx + 1, count, op_summary, err)
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
               finally:
                 queue.UpdateJobUnlocked(job)
             finally:
@@ -440,7 +524,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       queue.acquire()
       try:
         try:
       queue.acquire()
       try:
         try:
-          job.run_op_idx = -1
+          job.lock_status = None
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
@@ -448,8 +532,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
           status = job.CalcStatus()
       finally:
         queue.release()
-      logging.debug("Worker %s finished job %s, status = %s",
-                    self.worker_id, job_id, status)
+
+      logging.info("Finished job %s, status = %s", job_id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -457,38 +541,44 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
   """
   def __init__(self, queue):
 
   """
   def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
+    super(_JobQueueWorkerPool, self).__init__("JobQueue",
+                                              JOBQUEUE_THREADS,
                                               _JobQueueWorker)
     self.queue = queue
 
 
                                               _JobQueueWorker)
     self.queue = queue
 
 
-class JobQueue(object):
-  """Quue used to manaage the jobs.
+def _RequireOpenQueue(fn):
+  """Decorator for "public" functions.
 
 
-  @cvar _RE_JOB_FILE: regex matching the valid job file names
+  This function should be used for all 'public' functions. That is,
+  functions usually called from other classes. Note that this should
+  be applied only to methods (not plain functions), since it expects
+  that the decorated function is called with a first argument that has
+  a '_queue_lock' argument.
 
 
-  """
-  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
+  @warning: Use this decorator only after utils.LockedMethod!
 
 
-  def _RequireOpenQueue(fn):
-    """Decorator for "public" functions.
+  Example::
+    @utils.LockedMethod
+    @_RequireOpenQueue
+    def Example(self):
+      pass
 
 
-    This function should be used for all 'public' functions. That is,
-    functions usually called from other classes.
+  """
+  def wrapper(self, *args, **kwargs):
+    # pylint: disable-msg=W0212
+    assert self._queue_lock is not None, "Queue should be open"
+    return fn(self, *args, **kwargs)
+  return wrapper
 
 
-    @warning: Use this decorator only after utils.LockedMethod!
 
 
-    Example::
-      @utils.LockedMethod
-      @_RequireOpenQueue
-      def Example(self):
-        pass
+class JobQueue(object):
+  """Queue used to manage the jobs.
 
 
-    """
-    def wrapper(self, *args, **kwargs):
-      assert self._queue_lock is not None, "Queue should be open"
-      return fn(self, *args, **kwargs)
-    return wrapper
+  @cvar _RE_JOB_FILE: regex matching the valid job file names
+
+  """
+  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
 
   def __init__(self, context):
     """Constructor for JobQueue.
 
   def __init__(self, context):
     """Constructor for JobQueue.
@@ -569,9 +659,8 @@ class JobQueue(object):
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
-              for op in job.ops:
-                op.status = constants.OP_STATUS_ERROR
-                op.result = "Unclean master daemon shutdown"
+              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                    "Unclean master daemon shutdown")
             finally:
               self.UpdateJobUnlocked(job)
 
             finally:
               self.UpdateJobUnlocked(job)
 
@@ -595,7 +684,11 @@ class JobQueue(object):
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
-    rpc.RpcRunner.call_jobqueue_purge(node_name)
+    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
+    msg = result.fail_msg
+    if msg:
+      logging.warning("Cannot cleanup queue directory on node %s: %s",
+                      node_name, msg)
 
     if not node.master_candidate:
       # remove if existing, ignoring errors
 
     if not node.master_candidate:
       # remove if existing, ignoring errors
@@ -611,17 +704,15 @@ class JobQueue(object):
 
     for file_name in files:
       # Read file content
 
     for file_name in files:
       # Read file content
-      fd = open(file_name, "r")
-      try:
-        content = fd.read()
-      finally:
-        fd.close()
+      content = utils.ReadFile(file_name)
 
       result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                   [node.primary_ip],
                                                   file_name, content)
 
       result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                   [node.primary_ip],
                                                   file_name, content)
-      if not result[node_name]:
-        logging.error("Failed to upload %s to %s", file_name, node_name)
+      msg = result[node_name].fail_msg
+      if msg:
+        logging.error("Failed to upload file %s to node %s: %s",
+                      file_name, node_name, msg)
 
     self._nodes[node_name] = node.primary_ip
 
 
     self._nodes[node_name] = node.primary_ip
 
@@ -640,12 +731,13 @@ class JobQueue(object):
     except KeyError:
       pass
 
     except KeyError:
       pass
 
-  def _CheckRpcResult(self, result, nodes, failmsg):
+  @staticmethod
+  def _CheckRpcResult(result, nodes, failmsg):
     """Verifies the status of an RPC call.
 
     Since we aim to keep consistency should this node (the current
     master) fail, we will log errors if our rpc fail, and especially
     """Verifies the status of an RPC call.
 
     Since we aim to keep consistency should this node (the current
     master) fail, we will log errors if our rpc fail, and especially
-    log the case when more than half of the nodes failes.
+    log the case when more than half of the nodes fails.
 
     @param result: the data as returned from the rpc call
     @type nodes: list
 
     @param result: the data as returned from the rpc call
     @type nodes: list
@@ -658,13 +750,13 @@ class JobQueue(object):
     success = []
 
     for node in nodes:
     success = []
 
     for node in nodes:
-      if result[node]:
-        success.append(node)
-      else:
+      msg = result[node].fail_msg
+      if msg:
         failed.append(node)
         failed.append(node)
-
-    if failed:
-      logging.error("%s failed on %s", failmsg, ", ".join(failed))
+        logging.error("RPC call %s (%s) failed on node %s: %s",
+                      result[node].call, failmsg, node, msg)
+      else:
+        success.append(node)
 
     # +1 for the master node
     if (len(success) + 1) < len(failed):
 
     # +1 for the master node
     if (len(success) + 1) < len(failed):
@@ -702,26 +794,27 @@ class JobQueue(object):
     self._CheckRpcResult(result, self._nodes,
                          "Updating %s" % file_name)
 
     self._CheckRpcResult(result, self._nodes,
                          "Updating %s" % file_name)
 
-  def _RenameFileUnlocked(self, old, new):
+  def _RenameFilesUnlocked(self, rename):
     """Renames a file locally and then replicate the change.
 
     This function will rename a file in the local queue directory
     and then replicate this rename to all the other nodes we have.
 
     """Renames a file locally and then replicate the change.
 
     This function will rename a file in the local queue directory
     and then replicate this rename to all the other nodes we have.
 
-    @type old: str
-    @param old: the current name of the file
-    @type new: str
-    @param new: the new name of the file
+    @type rename: list of (old, new)
+    @param rename: List containing tuples mapping old to new names
 
     """
 
     """
-    os.rename(old, new)
+    # Rename them locally
+    for old, new in rename:
+      utils.RenameFile(old, new, mkdir=True)
 
 
+    # ... and on all nodes
     names, addrs = self._GetNodeIp()
     names, addrs = self._GetNodeIp()
-    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
-    self._CheckRpcResult(result, self._nodes,
-                         "Moving %s to %s" % (old, new))
+    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
+    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
 
 
-  def _FormatJobID(self, job_id):
+  @staticmethod
+  def _FormatJobID(job_id):
     """Convert a job ID to string format.
 
     Currently this just does C{str(job_id)} after performing some
     """Convert a job ID to string format.
 
     Currently this just does C{str(job_id)} after performing some
@@ -741,26 +834,43 @@ class JobQueue(object):
 
     return str(job_id)
 
 
     return str(job_id)
 
-  def _NewSerialUnlocked(self):
+  @classmethod
+  def _GetArchiveDirectory(cls, job_id):
+    """Returns the archive directory for a job.
+
+    @type job_id: str
+    @param job_id: Job identifier
+    @rtype: str
+    @return: Directory name
+
+    """
+    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
+
+  def _NewSerialsUnlocked(self, count):
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
+    @type count: integer
+    @param count: how many serials to return
     @rtype: str
     @return: a string representing the job identifier.
 
     """
     @rtype: str
     @return: a string representing the job identifier.
 
     """
+    assert count > 0
     # New number
     # New number
-    serial = self._last_serial + 1
+    serial = self._last_serial + count
 
     # Write to file
     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                         "%s\n" % serial)
 
 
     # Write to file
     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                         "%s\n" % serial)
 
+    result = [self._FormatJobID(v)
+              for v in range(self._last_serial, serial + 1)]
     # Keep it only if we were able to write the file
     self._last_serial = serial
 
     # Keep it only if we were able to write the file
     self._last_serial = serial
 
-    return self._FormatJobID(serial)
+    return result
 
   @staticmethod
   def _GetJobPath(job_id):
 
   @staticmethod
   def _GetJobPath(job_id):
@@ -772,10 +882,10 @@ class JobQueue(object):
     @return: the path to the job file
 
     """
     @return: the path to the job file
 
     """
-    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
+    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
 
 
-  @staticmethod
-  def _GetArchivedJobPath(job_id):
+  @classmethod
+  def _GetArchivedJobPath(cls, job_id):
     """Returns the archived job file for a give job id.
 
     @type job_id: str
     """Returns the archived job file for a give job id.
 
     @type job_id: str
@@ -784,7 +894,8 @@ class JobQueue(object):
     @return: the path to the archived job file
 
     """
     @return: the path to the archived job file
 
     """
-    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
+    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
+                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
 
   @classmethod
   def _ExtractJobID(cls, name):
 
   @classmethod
   def _ExtractJobID(cls, name):
@@ -818,6 +929,7 @@ class JobQueue(object):
     @return: the list of job IDs
 
     """
     @return: the list of job IDs
 
     """
+    # pylint: disable-msg=W0613
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
     jlist = utils.NiceSort(jlist)
     return jlist
@@ -852,19 +964,17 @@ class JobQueue(object):
     filepath = self._GetJobPath(job_id)
     logging.debug("Loading job from %s", filepath)
     try:
     filepath = self._GetJobPath(job_id)
     logging.debug("Loading job from %s", filepath)
     try:
-      fd = open(filepath, "r")
+      raw_data = utils.ReadFile(filepath)
     except IOError, err:
       if err.errno in (errno.ENOENT, ):
         return None
       raise
     except IOError, err:
       if err.errno in (errno.ENOENT, ):
         return None
       raise
-    try:
-      data = serializer.LoadJson(fd.read())
-    finally:
-      fd.close()
+
+    data = serializer.LoadJson(raw_data)
 
     try:
       job = _QueuedJob.Restore(self, data)
 
     try:
       job = _QueuedJob.Restore(self, data)
-    except Exception, err:
+    except Exception, err: # pylint: disable-msg=W0703
       new_path = self._GetArchivedJobPath(job_id)
       if filepath == new_path:
         # job already archived (future case)
       new_path = self._GetArchivedJobPath(job_id)
       if filepath == new_path:
         # job already archived (future case)
@@ -872,7 +982,7 @@ class JobQueue(object):
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
-        self._RenameFileUnlocked(filepath, new_path)
+        self._RenameFilesUnlocked([(filepath, new_path)])
       return None
 
     self._memcache[job_id] = job
       return None
 
     self._memcache[job_id] = job
@@ -915,7 +1025,7 @@ class JobQueue(object):
     and in the future we might merge them.
 
     @type drain_flag: boolean
     and in the future we might merge them.
 
     @type drain_flag: boolean
-    @param drain_flag: wheter to set or unset the drain flag
+    @param drain_flag: Whether to set or unset the drain flag
 
     """
     if drain_flag:
 
     """
     if drain_flag:
@@ -924,14 +1034,15 @@ class JobQueue(object):
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
-  @utils.LockedMethod
   @_RequireOpenQueue
   @_RequireOpenQueue
-  def SubmitJob(self, ops):
+  def _SubmitJobUnlocked(self, job_id, ops):
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
+    @type job_id: job ID
+    @param job_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
@@ -940,9 +1051,19 @@ class JobQueue(object):
 
     """
     if self._IsQueueMarkedDrain():
 
     """
     if self._IsQueueMarkedDrain():
-      raise errors.JobQueueDrainError()
-    # Get job identifier
-    job_id = self._NewSerialUnlocked()
+      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
+
+    # Check job queue size
+    size = len(self._ListJobFiles())
+    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
+      # TODO: Autoarchive jobs. Make sure it's not done on every job
+      # submission, though.
+      #size = ...
+      pass
+
+    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
+      raise errors.JobQueueFull()
+
     job = _QueuedJob(self, job_id, ops)
 
     # Write to disk
     job = _QueuedJob(self, job_id, ops)
 
     # Write to disk
@@ -956,6 +1077,38 @@ class JobQueue(object):
 
     return job.id
 
 
     return job.id
 
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitJob(self, ops):
+    """Create and store a new job.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    job_id = self._NewSerialsUnlocked(1)[0]
+    return self._SubmitJobUnlocked(job_id, ops)
+
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitManyJobs(self, jobs):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    results = []
+    all_job_ids = self._NewSerialsUnlocked(len(jobs))
+    for job_id, ops in zip(all_job_ids, jobs):
+      try:
+        data = self._SubmitJobUnlocked(job_id, ops)
+        status = True
+      except errors.GenericError, err:
+        data = str(err)
+        status = False
+      results.append((status, data))
+
+    return results
+
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
@@ -1002,17 +1155,13 @@ class JobQueue(object):
         as such by the clients
 
     """
         as such by the clients
 
     """
-    logging.debug("Waiting for changes in job %s", job_id)
-    end_time = time.time() + timeout
-    while True:
-      delta_time = end_time - time.time()
-      if delta_time < 0:
-        return constants.JOB_NOTCHANGED
+    job = self._LoadJobUnlocked(job_id)
+    if not job:
+      logging.debug("Job %s not found", job_id)
+      return None
 
 
-      job = self._LoadJobUnlocked(job_id)
-      if not job:
-        logging.debug("Job %s not found", job_id)
-        break
+    def _CheckForChanges():
+      logging.debug("Waiting for changes in job %s", job_id)
 
       status = job.CalcStatus()
       job_info = self._GetJobInfoUnlocked(job, fields)
 
       status = job.CalcStatus()
       job_info = self._GetJobInfoUnlocked(job, fields)
@@ -1026,25 +1175,24 @@ class JobQueue(object):
       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
 
       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
 
-      if status not in (constants.JOB_STATUS_QUEUED,
-                        constants.JOB_STATUS_RUNNING,
-                        constants.JOB_STATUS_WAITLOCK):
-        # Don't even try to wait if the job is no longer running, there will be
-        # no changes.
-        break
-
-      if (prev_job_info != job_info or
+      # Don't even try to wait if the job is no longer running, there will be
+      # no changes.
+      if (status not in (constants.JOB_STATUS_QUEUED,
+                         constants.JOB_STATUS_RUNNING,
+                         constants.JOB_STATUS_WAITLOCK) or
+          prev_job_info != job_info or
           (log_entries and prev_log_serial != log_entries[0][0])):
           (log_entries and prev_log_serial != log_entries[0][0])):
-        break
+        logging.debug("Job %s changed", job_id)
+        return (job_info, log_entries)
 
 
-      logging.debug("Waiting again")
+      raise utils.RetryAgain()
 
 
-      # Release the queue lock while waiting
-      job.change.wait(delta_time)
-
-    logging.debug("Job %s changed", job_id)
-
-    return (job_info, log_entries)
+    try:
+      # Setting wait function to release the queue lock while waiting
+      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
+                         wait_fn=job.change.wait)
+    except utils.RetryTimeout:
+      return constants.JOB_NOTCHANGED
 
   @utils.LockedMethod
   @_RequireOpenQueue
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1068,8 +1216,8 @@ class JobQueue(object):
 
     if job_status not in (constants.JOB_STATUS_QUEUED,
                           constants.JOB_STATUS_WAITLOCK):
 
     if job_status not in (constants.JOB_STATUS_QUEUED,
                           constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer in the queue", job.id)
-      return (False, "Job %s is no longer in the queue" % job.id)
+      logging.debug("Job %s is no longer waiting in the queue", job.id)
+      return (False, "Job %s is no longer waiting in the queue" % job.id)
 
     if job_status == constants.JOB_STATUS_QUEUED:
       self.CancelJobUnlocked(job)
 
     if job_status == constants.JOB_STATUS_QUEUED:
       self.CancelJobUnlocked(job)
@@ -1078,8 +1226,7 @@ class JobQueue(object):
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
-        for op in job.ops:
-          op.status = constants.OP_STATUS_CANCELING
+        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
@@ -1090,56 +1237,69 @@ class JobQueue(object):
 
     """
     try:
 
     """
     try:
-      for op in job.ops:
-        op.status = constants.OP_STATUS_ERROR
-        op.result = "Job canceled by request"
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                            "Job canceled by request")
     finally:
       self.UpdateJobUnlocked(job)
 
   @_RequireOpenQueue
     finally:
       self.UpdateJobUnlocked(job)
 
   @_RequireOpenQueue
-  def _ArchiveJobUnlocked(self, job_id):
-    """Archives a job.
+  def _ArchiveJobsUnlocked(self, jobs):
+    """Archives jobs.
 
 
-    @type job_id: string
-    @param job_id: the ID of job to be archived
+    @type jobs: list of L{_QueuedJob}
+    @param jobs: Job objects
+    @rtype: int
+    @return: Number of archived jobs
 
     """
 
     """
-    logging.info("Archiving job %s", job_id)
+    archive_jobs = []
+    rename_files = []
+    for job in jobs:
+      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
+                                  constants.JOB_STATUS_SUCCESS,
+                                  constants.JOB_STATUS_ERROR):
+        logging.debug("Job %s is not yet done", job.id)
+        continue
 
 
-    job = self._LoadJobUnlocked(job_id)
-    if not job:
-      logging.debug("Job %s not found", job_id)
-      return
+      archive_jobs.append(job)
 
 
-    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
-                                constants.JOB_STATUS_SUCCESS,
-                                constants.JOB_STATUS_ERROR):
-      logging.debug("Job %s is not yet done", job.id)
-      return
+      old = self._GetJobPath(job.id)
+      new = self._GetArchivedJobPath(job.id)
+      rename_files.append((old, new))
 
 
-    old = self._GetJobPath(job.id)
-    new = self._GetArchivedJobPath(job.id)
+    # TODO: What if 1..n files fail to rename?
+    self._RenameFilesUnlocked(rename_files)
 
 
-    self._RenameFileUnlocked(old, new)
+    logging.debug("Successfully archived job(s) %s",
+                  utils.CommaJoin(job.id for job in archive_jobs))
 
 
-    logging.debug("Successfully archived job %s", job.id)
+    return len(archive_jobs)
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
 
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
 
-    This is just a wrapper over L{_ArchiveJobUnlocked}.
+    This is just a wrapper over L{_ArchiveJobsUnlocked}.
 
     @type job_id: string
     @param job_id: Job ID of job to be archived.
 
     @type job_id: string
     @param job_id: Job ID of job to be archived.
+    @rtype: bool
+    @return: Whether job was archived
 
     """
 
     """
-    return self._ArchiveJobUnlocked(job_id)
+    logging.info("Archiving job %s", job_id)
+
+    job = self._LoadJobUnlocked(job_id)
+    if not job:
+      logging.debug("Job %s not found", job_id)
+      return False
+
+    return self._ArchiveJobsUnlocked([job]) == 1
 
   @utils.LockedMethod
   @_RequireOpenQueue
 
   @utils.LockedMethod
   @_RequireOpenQueue
-  def AutoArchiveJobs(self, age):
+  def AutoArchiveJobs(self, age, timeout):
     """Archives all jobs based on age.
 
     The method will archive all jobs which are older than the age
     """Archives all jobs based on age.
 
     The method will archive all jobs which are older than the age
@@ -1154,24 +1314,47 @@ class JobQueue(object):
     logging.info("Archiving jobs with age more than %s seconds", age)
 
     now = time.time()
     logging.info("Archiving jobs with age more than %s seconds", age)
 
     now = time.time()
-    for jid in self._GetJobIDsUnlocked(archived=False):
-      job = self._LoadJobUnlocked(jid)
-      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
-                                  constants.OP_STATUS_ERROR,
-                                  constants.OP_STATUS_CANCELED):
-        continue
-      if job.end_timestamp is None:
-        if job.start_timestamp is None:
-          job_age = job.received_timestamp
+    end_time = now + timeout
+    archived_count = 0
+    last_touched = 0
+
+    all_job_ids = self._GetJobIDsUnlocked(archived=False)
+    pending = []
+    for idx, job_id in enumerate(all_job_ids):
+      last_touched = idx + 1
+
+      # Not optimal because jobs could be pending
+      # TODO: Measure average duration for job archival and take number of
+      # pending jobs into account.
+      if time.time() > end_time:
+        break
+
+      # Returns None if the job failed to load
+      job = self._LoadJobUnlocked(job_id)
+      if job:
+        if job.end_timestamp is None:
+          if job.start_timestamp is None:
+            job_age = job.received_timestamp
+          else:
+            job_age = job.start_timestamp
         else:
         else:
-          job_age = job.start_timestamp
-      else:
-        job_age = job.end_timestamp
+          job_age = job.end_timestamp
 
 
-      if age == -1 or now - job_age[0] > age:
-        self._ArchiveJobUnlocked(jid)
+        if age == -1 or now - job_age[0] > age:
+          pending.append(job)
 
 
-  def _GetJobInfoUnlocked(self, job, fields):
+          # Archive 10 jobs at a time
+          if len(pending) >= 10:
+            archived_count += self._ArchiveJobsUnlocked(pending)
+            pending = []
+
+    if pending:
+      archived_count += self._ArchiveJobsUnlocked(pending)
+
+    return (archived_count, len(all_job_ids) - last_touched)
+
+  @staticmethod
+  def _GetJobInfoUnlocked(job, fields):
     """Returns information about a job.
 
     @type job: L{_QueuedJob}
     """Returns information about a job.
 
     @type job: L{_QueuedJob}
@@ -1208,6 +1391,8 @@ class JobQueue(object):
         row.append(job.start_timestamp)
       elif fname == "end_ts":
         row.append(job.end_timestamp)
         row.append(job.start_timestamp)
       elif fname == "end_ts":
         row.append(job.end_timestamp)
+      elif fname == "lock_status":
+        row.append(job.lock_status)
       elif fname == "summary":
         row.append([op.input.Summary() for op in job.ops])
       else:
       elif fname == "summary":
         row.append([op.input.Summary() for op in job.ops])
       else: