Support IPv6 cluster init
[ganeti-local] / lib / jqueue.py
index a830fda..e54fe96 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2008 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -31,26 +31,39 @@ used by all other classes in this module.
 
 import os
 import logging
-import threading
 import errno
 import re
 import time
 import weakref
 
+try:
+  # pylint: disable-msg=E0611
+  from pyinotify import pyinotify
+except ImportError:
+  import pyinotify
+
+from ganeti import asyncnotifier
 from ganeti import constants
 from ganeti import serializer
 from ganeti import workerpool
+from ganeti import locking
 from ganeti import opcodes
 from ganeti import errors
 from ganeti import mcpu
 from ganeti import utils
 from ganeti import jstore
 from ganeti import rpc
+from ganeti import netutils
+from ganeti import compat
 
 
 JOBQUEUE_THREADS = 25
 JOBS_PER_ARCHIVE_DIRECTORY = 10000
 
+# member lock names to be passed to @ssynchronized decorator
+_LOCK = "_lock"
+_QUEUE = "_queue"
+
 
 class CancelJob(Exception):
   """Special exception to cancel a job.
@@ -155,7 +168,6 @@ class _QueuedJob(object):
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
   @ivar lock_status: In-memory locking information for debugging
-  @ivar change: a Condition variable we use for waiting for job changes
 
   """
   # pylint: disable-msg=W0212
@@ -177,8 +189,7 @@ class _QueuedJob(object):
 
     """
     if not ops:
-      # TODO: use a better exception
-      raise Exception("No opcodes")
+      raise errors.GenericError("A job needs at least one opcode")
 
     self.queue = queue
     self.id = job_id
@@ -191,9 +202,6 @@ class _QueuedJob(object):
     # In-memory attributes
     self.lock_status = None
 
-    # Condition to wait for changes
-    self.change = threading.Condition(self.queue._lock)
-
   def __repr__(self):
     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
               "id=%s" % self.id,
@@ -231,9 +239,6 @@ class _QueuedJob(object):
         obj.log_serial = max(obj.log_serial, log_entry[0])
       obj.ops.append(op)
 
-    # Condition to wait for changes
-    obj.change = threading.Condition(obj.queue._lock)
-
     return obj
 
   def Serialize(self):
@@ -326,6 +331,51 @@ class _QueuedJob(object):
 
     return entries
 
+  def GetInfo(self, fields):
+    """Returns information about a job.
+
+    @type fields: list
+    @param fields: names of fields to return
+    @rtype: list
+    @return: list with one element for each field
+    @raise errors.OpExecError: when an invalid field
+        has been passed
+
+    """
+    row = []
+    for fname in fields:
+      if fname == "id":
+        row.append(self.id)
+      elif fname == "status":
+        row.append(self.CalcStatus())
+      elif fname == "ops":
+        row.append([op.input.__getstate__() for op in self.ops])
+      elif fname == "opresult":
+        row.append([op.result for op in self.ops])
+      elif fname == "opstatus":
+        row.append([op.status for op in self.ops])
+      elif fname == "oplog":
+        row.append([op.log for op in self.ops])
+      elif fname == "opstart":
+        row.append([op.start_timestamp for op in self.ops])
+      elif fname == "opexec":
+        row.append([op.exec_timestamp for op in self.ops])
+      elif fname == "opend":
+        row.append([op.end_timestamp for op in self.ops])
+      elif fname == "received_ts":
+        row.append(self.received_timestamp)
+      elif fname == "start_ts":
+        row.append(self.start_timestamp)
+      elif fname == "end_ts":
+        row.append(self.end_timestamp)
+      elif fname == "lock_status":
+        row.append(self.lock_status)
+      elif fname == "summary":
+        row.append([op.input.Summary() for op in self.ops])
+      else:
+        raise errors.OpExecError("Invalid self query field '%s'" % fname)
+    return row
+
   def MarkUnfinishedOps(self, status, result):
     """Mark unfinished opcodes with a given status and result.
 
@@ -337,14 +387,17 @@ class _QueuedJob(object):
     @param result: the opcode result
 
     """
-    not_marked = True
-    for op in self.ops:
-      if op.status in constants.OPS_FINALIZED:
-        assert not_marked, "Finalized opcodes found after non-finalized ones"
-        continue
-      op.status = status
-      op.result = result
-      not_marked = False
+    try:
+      not_marked = True
+      for op in self.ops:
+        if op.status in constants.OPS_FINALIZED:
+          assert not_marked, "Finalized opcodes found after non-finalized ones"
+          continue
+        op.status = status
+        op.result = result
+        not_marked = False
+    finally:
+      self.queue.UpdateJobUnlocked(self)
 
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
@@ -367,6 +420,16 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     self._job = job
     self._op = op
 
+  def _CheckCancel(self):
+    """Raises an exception to cancel the job if asked to.
+
+    """
+    # Cancel here if we were asked to
+    if self._op.status == constants.OP_STATUS_CANCELING:
+      logging.debug("Canceling opcode")
+      raise CancelJob()
+
+  @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
 
@@ -376,22 +439,30 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
 
     """
-    self._queue.acquire()
-    try:
-      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
-                                 constants.OP_STATUS_CANCELING)
+    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+                               constants.OP_STATUS_CANCELING)
 
-      # All locks are acquired by now
-      self._job.lock_status = None
+    # All locks are acquired by now
+    self._job.lock_status = None
 
-      # Cancel here if we were asked to
-      if self._op.status == constants.OP_STATUS_CANCELING:
-        raise CancelJob()
+    # Cancel here if we were asked to
+    self._CheckCancel()
 
-      self._op.status = constants.OP_STATUS_RUNNING
-      self._op.exec_timestamp = TimeStampNow()
-    finally:
-      self._queue.release()
+    logging.debug("Opcode is now running")
+    self._op.status = constants.OP_STATUS_RUNNING
+    self._op.exec_timestamp = TimeStampNow()
+
+    # And finally replicate the job status
+    self._queue.UpdateJobUnlocked(self._job)
+
+  @locking.ssynchronized(_QUEUE, shared=1)
+  def _AppendFeedback(self, timestamp, log_type, log_msg):
+    """Internal feedback append function, with locks
+
+    """
+    self._job.log_serial += 1
+    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+    self._queue.UpdateJobUnlocked(self._job, replicate=False)
 
   def Feedback(self, *args):
     """Append a log entry.
@@ -408,15 +479,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # The time is split to make serialization easier and not lose
     # precision.
     timestamp = utils.SplitTime(time.time())
-
-    self._queue.acquire()
-    try:
-      self._job.log_serial += 1
-      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
-
-      self._job.change.notifyAll()
-    finally:
-      self._queue.release()
+    self._AppendFeedback(timestamp, log_type, log_msg)
 
   def ReportLocks(self, msg):
     """Write locking information to the job.
@@ -424,9 +487,208 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     Called whenever the LU processor is waiting for a lock or has acquired one.
 
     """
+    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+                               constants.OP_STATUS_CANCELING)
+
     # Not getting the queue lock because this is a single assignment
     self._job.lock_status = msg
 
+    # Cancel here if we were asked to
+    self._CheckCancel()
+
+
+class _JobChangesChecker(object):
+  def __init__(self, fields, prev_job_info, prev_log_serial):
+    """Initializes this class.
+
+    @type fields: list of strings
+    @param fields: Fields requested by LUXI client
+    @type prev_job_info: string
+    @param prev_job_info: previous job info, as passed by the LUXI client
+    @type prev_log_serial: string
+    @param prev_log_serial: previous job serial, as passed by the LUXI client
+
+    """
+    self._fields = fields
+    self._prev_job_info = prev_job_info
+    self._prev_log_serial = prev_log_serial
+
+  def __call__(self, job):
+    """Checks whether job has changed.
+
+    @type job: L{_QueuedJob}
+    @param job: Job object
+
+    """
+    status = job.CalcStatus()
+    job_info = job.GetInfo(self._fields)
+    log_entries = job.GetLogEntries(self._prev_log_serial)
+
+    # Serializing and deserializing data can cause type changes (e.g. from
+    # tuple to list) or precision loss. We're doing it here so that we get
+    # the same modifications as the data received from the client. Without
+    # this, the comparison afterwards might fail without the data being
+    # significantly different.
+    # TODO: we just deserialized from disk, investigate how to make sure that
+    # the job info and log entries are compatible to avoid this further step.
+    # TODO: Doing something like in testutils.py:UnifyValueType might be more
+    # efficient, though floats will be tricky
+    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
+    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
+
+    # Don't even try to wait if the job is no longer running, there will be
+    # no changes.
+    if (status not in (constants.JOB_STATUS_QUEUED,
+                       constants.JOB_STATUS_RUNNING,
+                       constants.JOB_STATUS_WAITLOCK) or
+        job_info != self._prev_job_info or
+        (log_entries and self._prev_log_serial != log_entries[0][0])):
+      logging.debug("Job %s changed", job.id)
+      return (job_info, log_entries)
+
+    return None
+
+
+class _JobFileChangesWaiter(object):
+  def __init__(self, filename):
+    """Initializes this class.
+
+    @type filename: string
+    @param filename: Path to job file
+    @raises errors.InotifyError: if the notifier cannot be setup
+
+    """
+    self._wm = pyinotify.WatchManager()
+    self._inotify_handler = \
+      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
+    self._notifier = \
+      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
+    try:
+      self._inotify_handler.enable()
+    except Exception:
+      # pyinotify doesn't close file descriptors automatically
+      self._notifier.stop()
+      raise
+
+  def _OnInotify(self, notifier_enabled):
+    """Callback for inotify.
+
+    """
+    if not notifier_enabled:
+      self._inotify_handler.enable()
+
+  def Wait(self, timeout):
+    """Waits for the job file to change.
+
+    @type timeout: float
+    @param timeout: Timeout in seconds
+    @return: Whether there have been events
+
+    """
+    assert timeout >= 0
+    have_events = self._notifier.check_events(timeout * 1000)
+    if have_events:
+      self._notifier.read_events()
+    self._notifier.process_events()
+    return have_events
+
+  def Close(self):
+    """Closes underlying notifier and its file descriptor.
+
+    """
+    self._notifier.stop()
+
+
+class _JobChangesWaiter(object):
+  def __init__(self, filename):
+    """Initializes this class.
+
+    @type filename: string
+    @param filename: Path to job file
+
+    """
+    self._filewaiter = None
+    self._filename = filename
+
+  def Wait(self, timeout):
+    """Waits for a job to change.
+
+    @type timeout: float
+    @param timeout: Timeout in seconds
+    @return: Whether there have been events
+
+    """
+    if self._filewaiter:
+      return self._filewaiter.Wait(timeout)
+
+    # Lazy setup: Avoid inotify setup cost when job file has already changed.
+    # If this point is reached, return immediately and let caller check the job
+    # file again in case there were changes since the last check. This avoids a
+    # race condition.
+    self._filewaiter = _JobFileChangesWaiter(self._filename)
+
+    return True
+
+  def Close(self):
+    """Closes underlying waiter.
+
+    """
+    if self._filewaiter:
+      self._filewaiter.Close()
+
+
+class _WaitForJobChangesHelper(object):
+  """Helper class using inotify to wait for changes in a job file.
+
+  This class takes a previous job status and serial, and alerts the client when
+  the current job status has changed.
+
+  """
+  @staticmethod
+  def _CheckForChanges(job_load_fn, check_fn):
+    job = job_load_fn()
+    if not job:
+      raise errors.JobLost()
+
+    result = check_fn(job)
+    if result is None:
+      raise utils.RetryAgain()
+
+    return result
+
+  def __call__(self, filename, job_load_fn,
+               fields, prev_job_info, prev_log_serial, timeout):
+    """Waits for changes on a job.
+
+    @type filename: string
+    @param filename: File on which to wait for changes
+    @type job_load_fn: callable
+    @param job_load_fn: Function to load job
+    @type fields: list of strings
+    @param fields: Which fields to check for changes
+    @type prev_job_info: list or None
+    @param prev_job_info: Last job information returned
+    @type prev_log_serial: int
+    @param prev_log_serial: Last job message serial number
+    @type timeout: float
+    @param timeout: maximum time to wait in seconds
+
+    """
+    try:
+      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
+      waiter = _JobChangesWaiter(filename)
+      try:
+        return utils.Retry(compat.partial(self._CheckForChanges,
+                                          job_load_fn, check_fn),
+                           utils.RETRY_REMAINING_TIME, timeout,
+                           wait_fn=waiter.Wait)
+      finally:
+        waiter.Close()
+    except (errors.InotifyError, errors.JobLost):
+      return None
+    except utils.RetryTimeout:
+      return constants.JOB_NOTCHANGED
+
 
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
@@ -463,11 +725,14 @@ class _JobQueueWorker(workerpool.BaseWorker):
             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                          op_summary)
 
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
               if op.status == constants.OP_STATUS_CANCELED:
+                logging.debug("Canceling opcode")
                 raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
+              logging.debug("Opcode %s/%s waiting for locks",
+                            idx + 1, count)
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
@@ -483,11 +748,20 @@ class _JobQueueWorker(workerpool.BaseWorker):
             result = proc.ExecOpCode(input_opcode,
                                      _OpExecCallbacks(queue, job, op))
 
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
+              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
               op.status = constants.OP_STATUS_SUCCESS
               op.result = result
               op.end_timestamp = TimeStampNow()
+              if idx == count - 1:
+                job.lock_status = None
+                job.end_timestamp = TimeStampNow()
+
+                # Consistency check
+                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
+                                  for i in job.ops)
+
               queue.UpdateJobUnlocked(job)
             finally:
               queue.release()
@@ -498,27 +772,46 @@ class _JobQueueWorker(workerpool.BaseWorker):
             # Will be handled further up
             raise
           except Exception, err:
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
               try:
+                logging.debug("Opcode %s/%s failed", idx + 1, count)
                 op.status = constants.OP_STATUS_ERROR
                 if isinstance(err, errors.GenericError):
-                  op.result = errors.EncodeException(err)
+                  to_encode = err
                 else:
-                  op.result = str(err)
+                  to_encode = errors.OpExecError(str(err))
+                op.result = errors.EncodeException(to_encode)
                 op.end_timestamp = TimeStampNow()
                 logging.info("Op %s/%s: Error in opcode %s: %s",
                              idx + 1, count, op_summary, err)
+
+                to_encode = errors.OpExecError("Preceding opcode failed")
+                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                      errors.EncodeException(to_encode))
+
+                # Consistency check
+                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
+                                  for i in job.ops[:idx])
+                assert compat.all(i.status == constants.OP_STATUS_ERROR and
+                                  errors.GetEncodedError(i.result)
+                                  for i in job.ops[idx:])
               finally:
+                job.lock_status = None
+                job.end_timestamp = TimeStampNow()
                 queue.UpdateJobUnlocked(job)
             finally:
               queue.release()
             raise
 
       except CancelJob:
-        queue.acquire()
+        queue.acquire(shared=1)
         try:
-          queue.CancelJobUnlocked(job)
+          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                                "Job canceled by request")
+          job.lock_status = None
+          job.end_timestamp = TimeStampNow()
+          queue.UpdateJobUnlocked(job)
         finally:
           queue.release()
       except errors.GenericError, err:
@@ -526,19 +819,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
       except:
         logging.exception("Unhandled exception")
     finally:
-      queue.acquire()
-      try:
-        try:
-          job.lock_status = None
-          job.end_timestamp = TimeStampNow()
-          queue.UpdateJobUnlocked(job)
-        finally:
-          job_id = job.id
-          status = job.CalcStatus()
-      finally:
-        queue.release()
-
-      logging.info("Finished job %s, status = %s", job_id, status)
+      status = job.CalcStatus()
+      logging.info("Finished job %s, status = %s", job.id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -559,12 +841,12 @@ def _RequireOpenQueue(fn):
   functions usually called from other classes. Note that this should
   be applied only to methods (not plain functions), since it expects
   that the decorated function is called with a first argument that has
-  a '_queue_lock' argument.
+  a '_queue_filelock' argument.
 
-  @warning: Use this decorator only after utils.LockedMethod!
+  @warning: Use this decorator only after locking.ssynchronized
 
   Example::
-    @utils.LockedMethod
+    @locking.ssynchronized(_LOCK)
     @_RequireOpenQueue
     def Example(self):
       pass
@@ -572,7 +854,7 @@ def _RequireOpenQueue(fn):
   """
   def wrapper(self, *args, **kwargs):
     # pylint: disable-msg=W0212
-    assert self._queue_lock is not None, "Queue should be open"
+    assert self._queue_filelock is not None, "Queue should be open"
     return fn(self, *args, **kwargs)
   return wrapper
 
@@ -600,15 +882,21 @@ class JobQueue(object):
     """
     self.context = context
     self._memcache = weakref.WeakValueDictionary()
-    self._my_hostname = utils.HostInfo().name
+    self._my_hostname = netutils.Hostname.GetSysName()
+
+    # The Big JobQueue lock. If a code block or method acquires it in shared
+    # mode safe it must guarantee concurrency with all the code acquiring it in
+    # shared mode, including itself. In order not to acquire it at all
+    # concurrency must be guaranteed with all code acquiring it in shared mode
+    # and all code acquiring it exclusively.
+    self._lock = locking.SharedLock("JobQueue")
 
-    # Locking
-    self._lock = threading.Lock()
     self.acquire = self._lock.acquire
     self.release = self._lock.release
 
-    # Initialize
-    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
+    # Initialize the queue, and acquire the filelock.
+    # This ensures no other process is working on the job queue.
+    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
 
     # Read serial file
     self._last_serial = jstore.ReadSerial()
@@ -621,13 +909,14 @@ class JobQueue(object):
                        if n.master_candidate)
 
     # Remove master node
-    try:
-      del self._nodes[self._my_hostname]
-    except KeyError:
-      pass
+    self._nodes.pop(self._my_hostname, None)
 
     # TODO: Check consistency across nodes
 
+    self._queue_size = 0
+    self._UpdateQueueSizeUnlocked()
+    self._drained = self._IsQueueMarkedDrain()
+
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
     try:
@@ -657,17 +946,14 @@ class JobQueue(object):
           status = job.CalcStatus()
 
           if status in (constants.JOB_STATUS_QUEUED, ):
-            self._wpool.AddTask(job)
+            self._wpool.AddTask((job, ))
 
           elif status in (constants.JOB_STATUS_RUNNING,
                           constants.JOB_STATUS_WAITLOCK,
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
-            try:
-              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                                    "Unclean master daemon shutdown")
-            finally:
-              self.UpdateJobUnlocked(job)
+            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                  "Unclean master daemon shutdown")
 
         logging.info("Job queue inspection finished")
       finally:
@@ -676,7 +962,7 @@ class JobQueue(object):
       self._wpool.TerminateWorkers()
       raise
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def AddNode(self, node):
     """Register a new node with the queue.
@@ -721,7 +1007,7 @@ class JobQueue(object):
 
     self._nodes[node_name] = node.primary_ip
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def RemoveNode(self, node_name):
     """Callback called when removing nodes from the cluster.
@@ -730,11 +1016,7 @@ class JobQueue(object):
     @param node_name: the name of the node to remove
 
     """
-    try:
-      # The queue is removed by the "leave node" RPC call.
-      del self._nodes[node_name]
-    except KeyError:
-      pass
+    self._nodes.pop(node_name, None)
 
   @staticmethod
   def _CheckRpcResult(result, nodes, failmsg):
@@ -776,11 +1058,12 @@ class JobQueue(object):
         names and the second one with the node addresses
 
     """
+    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
     name_list = self._nodes.keys()
     addr_list = [self._nodes[name] for name in name_list]
     return name_list, addr_list
 
-  def _WriteAndReplicateFileUnlocked(self, file_name, data):
+  def _UpdateJobQueueFile(self, file_name, data, replicate):
     """Writes a file locally and then replicates it to all nodes.
 
     This function will replace the contents of a file on the local
@@ -790,14 +1073,16 @@ class JobQueue(object):
     @param file_name: the path of the file to be replicated
     @type data: str
     @param data: the new contents of the file
+    @type replicate: boolean
+    @param replicate: whether to spread the changes to the remote nodes
 
     """
     utils.WriteFile(file_name, data=data)
 
-    names, addrs = self._GetNodeIp()
-    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
-    self._CheckRpcResult(result, self._nodes,
-                         "Updating %s" % file_name)
+    if replicate:
+      names, addrs = self._GetNodeIp()
+      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
+      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
 
   def _RenameFilesUnlocked(self, rename):
     """Renames a file locally and then replicate the change.
@@ -867,8 +1152,8 @@ class JobQueue(object):
     serial = self._last_serial + count
 
     # Write to file
-    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
-                                        "%s\n" % serial)
+    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
+                             "%s\n" % serial, True)
 
     result = [self._FormatJobID(v)
               for v in range(self._last_serial, serial + 1)]
@@ -902,53 +1187,28 @@ class JobQueue(object):
     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
 
-  @classmethod
-  def _ExtractJobID(cls, name):
-    """Extract the job id from a filename.
-
-    @type name: str
-    @param name: the job filename
-    @rtype: job id or None
-    @return: the job id corresponding to the given filename,
-        or None if the filename does not represent a valid
-        job file
-
-    """
-    m = cls._RE_JOB_FILE.match(name)
-    if m:
-      return m.group(1)
-    else:
-      return None
-
-  def _GetJobIDsUnlocked(self, archived=False):
+  def _GetJobIDsUnlocked(self, sort=True):
     """Return all known job IDs.
 
-    If the parameter archived is True, archived jobs IDs will be
-    included. Currently this argument is unused.
-
     The method only looks at disk because it's a requirement that all
     jobs are present on disk (so in the _memcache we don't have any
     extra IDs).
 
+    @type sort: boolean
+    @param sort: perform sorting on the returned job ids
     @rtype: list
     @return: the list of job IDs
 
     """
-    # pylint: disable-msg=W0613
-    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
-    jlist = utils.NiceSort(jlist)
+    jlist = []
+    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
+      m = self._RE_JOB_FILE.match(filename)
+      if m:
+        jlist.append(m.group(1))
+    if sort:
+      jlist = utils.NiceSort(jlist)
     return jlist
 
-  def _ListJobFiles(self):
-    """Returns the list of current job files.
-
-    @rtype: list
-    @return: the list of job file names
-
-    """
-    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
-            if self._RE_JOB_FILE.match(name)]
-
   def _LoadJobUnlocked(self, job_id):
     """Loads a job from the disk or memory.
 
@@ -966,48 +1226,72 @@ class JobQueue(object):
       logging.debug("Found job %s in memcache", job_id)
       return job
 
-    filepath = self._GetJobPath(job_id)
-    logging.debug("Loading job from %s", filepath)
-    try:
-      raw_data = utils.ReadFile(filepath)
-    except IOError, err:
-      if err.errno in (errno.ENOENT, ):
-        return None
-      raise
-
-    data = serializer.LoadJson(raw_data)
-
     try:
-      job = _QueuedJob.Restore(self, data)
-    except Exception, err: # pylint: disable-msg=W0703
+      job = self._LoadJobFromDisk(job_id)
+      if job is None:
+        return job
+    except errors.JobFileCorrupted:
+      old_path = self._GetJobPath(job_id)
       new_path = self._GetArchivedJobPath(job_id)
-      if filepath == new_path:
+      if old_path == new_path:
         # job already archived (future case)
         logging.exception("Can't parse job %s", job_id)
       else:
         # non-archived case
         logging.exception("Can't parse job %s, will archive.", job_id)
-        self._RenameFilesUnlocked([(filepath, new_path)])
+        self._RenameFilesUnlocked([(old_path, new_path)])
       return None
 
     self._memcache[job_id] = job
     logging.debug("Added job %s to the cache", job_id)
     return job
 
-  def _GetJobsUnlocked(self, job_ids):
-    """Return a list of jobs based on their IDs.
+  def _LoadJobFromDisk(self, job_id):
+    """Load the given job file from disk.
 
-    @type job_ids: list
-    @param job_ids: either an empty list (meaning all jobs),
-        or a list of job IDs
-    @rtype: list
-    @return: the list of job objects
+    Given a job file, read, load and restore it in a _QueuedJob format.
+
+    @type job_id: string
+    @param job_id: job identifier
+    @rtype: L{_QueuedJob} or None
+    @return: either None or the job object
 
     """
-    if not job_ids:
-      job_ids = self._GetJobIDsUnlocked()
+    filepath = self._GetJobPath(job_id)
+    logging.debug("Loading job from %s", filepath)
+    try:
+      raw_data = utils.ReadFile(filepath)
+    except EnvironmentError, err:
+      if err.errno in (errno.ENOENT, ):
+        return None
+      raise
 
-    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
+    try:
+      data = serializer.LoadJson(raw_data)
+      job = _QueuedJob.Restore(self, data)
+    except Exception, err: # pylint: disable-msg=W0703
+      raise errors.JobFileCorrupted(err)
+
+    return job
+
+  def SafeLoadJobFromDisk(self, job_id):
+    """Load the given job file from disk.
+
+    Given a job file, read, load and restore it in a _QueuedJob format.
+    In case of error reading the job, it gets returned as None, and the
+    exception is logged.
+
+    @type job_id: string
+    @param job_id: job identifier
+    @rtype: L{_QueuedJob} or None
+    @return: either None or the job object
+
+    """
+    try:
+      return self._LoadJobFromDisk(job_id)
+    except (errors.JobFileCorrupted, EnvironmentError):
+      logging.exception("Can't load/parse job %s", job_id)
+      return None
 
   @staticmethod
   def _IsQueueMarkedDrain():
@@ -1022,8 +1306,15 @@ class JobQueue(object):
     """
     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
 
-  @staticmethod
-  def SetDrainFlag(drain_flag):
+  def _UpdateQueueSizeUnlocked(self):
+    """Update the queue size.
+
+    """
+    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def SetDrainFlag(self, drain_flag):
     """Sets the drain flag for the queue.
 
     @type drain_flag: boolean
@@ -1034,6 +1325,9 @@ class JobQueue(object):
       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
     else:
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+
+    self._drained = drain_flag
+
     return True
 
   @_RequireOpenQueue
@@ -1047,23 +1341,18 @@ class JobQueue(object):
     @param job_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
-    @rtype: job ID
-    @return: the job ID of the newly created job
-    @raise errors.JobQueueDrainError: if the job is marked for draining
+    @rtype: L{_QueuedJob}
+    @return: the job object to be queued
+    @raise errors.JobQueueDrainError: if the job queue is marked for draining
+    @raise errors.JobQueueFull: if the job queue has too many jobs in it
 
     """
-    if self._IsQueueMarkedDrain():
+    # Ok when sharing the big job queue lock, as the drain file is created when
+    # the lock is exclusive.
+    if self._drained:
       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
 
-    # Check job queue size
-    size = len(self._ListJobFiles())
-    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
-      # TODO: Autoarchive jobs. Make sure it's not done on every job
-      # submission, though.
-      #size = ...
-      pass
-
-    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
+    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
     job = _QueuedJob(self, job_id, ops)
@@ -1071,15 +1360,14 @@ class JobQueue(object):
     # Write to disk
     self.UpdateJobUnlocked(job)
 
+    self._queue_size += 1
+
     logging.debug("Adding new job %s to the cache", job_id)
     self._memcache[job_id] = job
 
-    # Add to worker pool
-    self._wpool.AddTask(job)
-
-    return job.id
+    return job
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def SubmitJob(self, ops):
     """Create and store a new job.
@@ -1088,9 +1376,10 @@ class JobQueue(object):
 
     """
     job_id = self._NewSerialsUnlocked(1)[0]
-    return self._SubmitJobUnlocked(job_id, ops)
+    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
+    return job_id
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def SubmitManyJobs(self, jobs):
     """Create and store multiple jobs.
@@ -1099,20 +1388,23 @@ class JobQueue(object):
 
     """
     results = []
+    tasks = []
     all_job_ids = self._NewSerialsUnlocked(len(jobs))
     for job_id, ops in zip(all_job_ids, jobs):
       try:
-        data = self._SubmitJobUnlocked(job_id, ops)
+        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
         status = True
+        data = job_id
       except errors.GenericError, err:
         data = str(err)
         status = False
       results.append((status, data))
+    self._wpool.AddManyTasks(tasks)
 
     return results
 
   @_RequireOpenQueue
-  def UpdateJobUnlocked(self, job):
+  def UpdateJobUnlocked(self, job, replicate=True):
     """Update a job's on disk storage.
 
     After a job has been modified, this function needs to be called in
@@ -1121,18 +1413,15 @@ class JobQueue(object):
 
     @type job: L{_QueuedJob}
     @param job: the changed job
+    @type replicate: boolean
+    @param replicate: whether to replicate the change to remote nodes
 
     """
     filename = self._GetJobPath(job.id)
     data = serializer.DumpJson(job.Serialize(), indent=False)
     logging.debug("Writing job %s to %s", job.id, filename)
-    self._WriteAndReplicateFileUnlocked(filename, data)
-
-    # Notify waiters about potential changes
-    job.change.notifyAll()
+    self._UpdateJobQueueFile(filename, data, replicate)
 
-  @utils.LockedMethod
-  @_RequireOpenQueue
   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                         timeout):
     """Waits for changes in a job.
@@ -1146,7 +1435,7 @@ class JobQueue(object):
     @type prev_log_serial: int
     @param prev_log_serial: Last job message serial number
     @type timeout: float
-    @param timeout: maximum time to wait
+    @param timeout: maximum time to wait in seconds
     @rtype: tuple (job info, log entries)
     @return: a tuple of the job information as required via
         the fields parameter, and the log entries as a list
@@ -1157,46 +1446,14 @@ class JobQueue(object):
         as such by the clients
 
     """
-    job = self._LoadJobUnlocked(job_id)
-    if not job:
-      logging.debug("Job %s not found", job_id)
-      return None
+    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
 
-    def _CheckForChanges():
-      logging.debug("Waiting for changes in job %s", job_id)
+    helper = _WaitForJobChangesHelper()
 
-      status = job.CalcStatus()
-      job_info = self._GetJobInfoUnlocked(job, fields)
-      log_entries = job.GetLogEntries(prev_log_serial)
-
-      # Serializing and deserializing data can cause type changes (e.g. from
-      # tuple to list) or precision loss. We're doing it here so that we get
-      # the same modifications as the data received from the client. Without
-      # this, the comparison afterwards might fail without the data being
-      # significantly different.
-      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
-      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
-
-      # Don't even try to wait if the job is no longer running, there will be
-      # no changes.
-      if (status not in (constants.JOB_STATUS_QUEUED,
-                         constants.JOB_STATUS_RUNNING,
-                         constants.JOB_STATUS_WAITLOCK) or
-          prev_job_info != job_info or
-          (log_entries and prev_log_serial != log_entries[0][0])):
-        logging.debug("Job %s changed", job_id)
-        return (job_info, log_entries)
-
-      raise utils.RetryAgain()
+    return helper(self._GetJobPath(job_id), load_fn,
+                  fields, prev_job_info, prev_log_serial, timeout)
 
-    try:
-      # Setting wait function to release the queue lock while waiting
-      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
-                         wait_fn=job.change.wait)
-    except utils.RetryTimeout:
-      return constants.JOB_NOTCHANGED
-
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def CancelJob(self, job_id):
     """Cancels a job.
@@ -1222,29 +1479,16 @@ class JobQueue(object):
       return (False, "Job %s is no longer waiting in the queue" % job.id)
 
     if job_status == constants.JOB_STATUS_QUEUED:
-      self.CancelJobUnlocked(job)
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                            "Job canceled by request")
       return (True, "Job %s canceled" % job.id)
 
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
-      try:
-        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
-      finally:
-        self.UpdateJobUnlocked(job)
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       return (True, "Job %s will be canceled" % job.id)
 
   @_RequireOpenQueue
-  def CancelJobUnlocked(self, job):
-    """Marks a job as canceled.
-
-    """
-    try:
-      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
-                            "Job canceled by request")
-    finally:
-      self.UpdateJobUnlocked(job)
-
-  @_RequireOpenQueue
   def _ArchiveJobsUnlocked(self, jobs):
     """Archives jobs.
 
@@ -1257,9 +1501,7 @@ class JobQueue(object):
     archive_jobs = []
     rename_files = []
     for job in jobs:
-      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
-                                  constants.JOB_STATUS_SUCCESS,
-                                  constants.JOB_STATUS_ERROR):
+      if job.CalcStatus() not in constants.JOBS_FINALIZED:
         logging.debug("Job %s is not yet done", job.id)
         continue
 
@@ -1275,9 +1517,14 @@ class JobQueue(object):
     logging.debug("Successfully archived job(s) %s",
                   utils.CommaJoin(job.id for job in archive_jobs))
 
+    # Since we haven't quite checked, above, if we succeeded or failed renaming
+    # the files, we update the cached queue size from the filesystem. When we
+    # get around to fix the TODO: above, we can use the number of actually
+    # archived jobs to fix this.
+    self._UpdateQueueSizeUnlocked()
     return len(archive_jobs)
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
@@ -1299,7 +1546,7 @@ class JobQueue(object):
 
     return self._ArchiveJobsUnlocked([job]) == 1
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def AutoArchiveJobs(self, age, timeout):
     """Archives all jobs based on age.
@@ -1320,7 +1567,7 @@ class JobQueue(object):
     archived_count = 0
     last_touched = 0
 
-    all_job_ids = self._GetJobIDsUnlocked(archived=False)
+    all_job_ids = self._GetJobIDsUnlocked()
     pending = []
     for idx, job_id in enumerate(all_job_ids):
       last_touched = idx + 1
@@ -1355,62 +1602,9 @@ class JobQueue(object):
 
     return (archived_count, len(all_job_ids) - last_touched)
 
-  @staticmethod
-  def _GetJobInfoUnlocked(job, fields):
-    """Returns information about a job.
-
-    @type job: L{_QueuedJob}
-    @param job: the job which we query
-    @type fields: list
-    @param fields: names of fields to return
-    @rtype: list
-    @return: list with one element for each field
-    @raise errors.OpExecError: when an invalid field
-        has been passed
-
-    """
-    row = []
-    for fname in fields:
-      if fname == "id":
-        row.append(job.id)
-      elif fname == "status":
-        row.append(job.CalcStatus())
-      elif fname == "ops":
-        row.append([op.input.__getstate__() for op in job.ops])
-      elif fname == "opresult":
-        row.append([op.result for op in job.ops])
-      elif fname == "opstatus":
-        row.append([op.status for op in job.ops])
-      elif fname == "oplog":
-        row.append([op.log for op in job.ops])
-      elif fname == "opstart":
-        row.append([op.start_timestamp for op in job.ops])
-      elif fname == "opexec":
-        row.append([op.exec_timestamp for op in job.ops])
-      elif fname == "opend":
-        row.append([op.end_timestamp for op in job.ops])
-      elif fname == "received_ts":
-        row.append(job.received_timestamp)
-      elif fname == "start_ts":
-        row.append(job.start_timestamp)
-      elif fname == "end_ts":
-        row.append(job.end_timestamp)
-      elif fname == "lock_status":
-        row.append(job.lock_status)
-      elif fname == "summary":
-        row.append([op.input.Summary() for op in job.ops])
-      else:
-        raise errors.OpExecError("Invalid job query field '%s'" % fname)
-    return row
-
-  @utils.LockedMethod
-  @_RequireOpenQueue
   def QueryJobs(self, job_ids, fields):
     """Returns a list of jobs in queue.
 
-    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
-    processing for each job.
-
     @type job_ids: list
     @param job_ids: sequence of job identifiers or None for all
     @type fields: list
@@ -1421,16 +1615,23 @@ class JobQueue(object):
 
     """
     jobs = []
+    list_all = False
+    if not job_ids:
+      # Since files are added to/removed from the queue atomically, there's no
+      # risk of getting the job ids in an inconsistent state.
+      job_ids = self._GetJobIDsUnlocked()
+      list_all = True
 
-    for job in self._GetJobsUnlocked(job_ids):
-      if job is None:
+    for job_id in job_ids:
+      job = self.SafeLoadJobFromDisk(job_id)
+      if job is not None:
+        jobs.append(job.GetInfo(fields))
+      elif not list_all:
         jobs.append(None)
-      else:
-        jobs.append(self._GetJobInfoUnlocked(job, fields))
 
     return jobs
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def Shutdown(self):
     """Stops the job queue.
@@ -1440,5 +1641,5 @@ class JobQueue(object):
     """
     self._wpool.TerminateWorkers()
 
-    self._queue_lock.Close()
-    self._queue_lock = None
+    self._queue_filelock.Close()
+    self._queue_filelock = None