Support IPv6 cluster init
[ganeti-local] / lib / jqueue.py
index 1165ad4..e54fe96 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2008 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -31,26 +31,39 @@ used by all other classes in this module.
 
 import os
 import logging
-import threading
 import errno
 import re
 import time
 import weakref
 
+try:
+  # pylint: disable-msg=E0611
+  from pyinotify import pyinotify
+except ImportError:
+  import pyinotify
+
+from ganeti import asyncnotifier
 from ganeti import constants
 from ganeti import serializer
 from ganeti import workerpool
+from ganeti import locking
 from ganeti import opcodes
 from ganeti import errors
 from ganeti import mcpu
 from ganeti import utils
 from ganeti import jstore
 from ganeti import rpc
+from ganeti import netutils
+from ganeti import compat
 
 
 JOBQUEUE_THREADS = 25
 JOBS_PER_ARCHIVE_DIRECTORY = 10000
 
+# member lock names to be passed to @ssynchronized decorator
+_LOCK = "_lock"
+_QUEUE = "_queue"
+
 
 class CancelJob(Exception):
   """Special exception to cancel a job.
@@ -155,7 +168,6 @@ class _QueuedJob(object):
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
   @ivar lock_status: In-memory locking information for debugging
-  @ivar change: a Condition variable we use for waiting for job changes
 
   """
   # pylint: disable-msg=W0212
@@ -190,9 +202,6 @@ class _QueuedJob(object):
     # In-memory attributes
     self.lock_status = None
 
-    # Condition to wait for changes
-    self.change = threading.Condition(self.queue._lock)
-
   def __repr__(self):
     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
               "id=%s" % self.id,
@@ -230,9 +239,6 @@ class _QueuedJob(object):
         obj.log_serial = max(obj.log_serial, log_entry[0])
       obj.ops.append(op)
 
-    # Condition to wait for changes
-    obj.change = threading.Condition(obj.queue._lock)
-
     return obj
 
   def Serialize(self):
@@ -381,14 +387,17 @@ class _QueuedJob(object):
     @param result: the opcode result
 
     """
-    not_marked = True
-    for op in self.ops:
-      if op.status in constants.OPS_FINALIZED:
-        assert not_marked, "Finalized opcodes found after non-finalized ones"
-        continue
-      op.status = status
-      op.result = result
-      not_marked = False
+    try:
+      not_marked = True
+      for op in self.ops:
+        if op.status in constants.OPS_FINALIZED:
+          assert not_marked, "Finalized opcodes found after non-finalized ones"
+          continue
+        op.status = status
+        op.result = result
+        not_marked = False
+    finally:
+      self.queue.UpdateJobUnlocked(self)
 
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
@@ -411,6 +420,16 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     self._job = job
     self._op = op
 
+  def _CheckCancel(self):
+    """Raises an exception to cancel the job if asked to.
+
+    """
+    # Cancel here if we were asked to
+    if self._op.status == constants.OP_STATUS_CANCELING:
+      logging.debug("Canceling opcode")
+      raise CancelJob()
+
+  @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
 
@@ -420,22 +439,30 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
 
     """
-    self._queue.acquire()
-    try:
-      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
-                                 constants.OP_STATUS_CANCELING)
+    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+                               constants.OP_STATUS_CANCELING)
 
-      # All locks are acquired by now
-      self._job.lock_status = None
+    # All locks are acquired by now
+    self._job.lock_status = None
 
-      # Cancel here if we were asked to
-      if self._op.status == constants.OP_STATUS_CANCELING:
-        raise CancelJob()
+    # Cancel here if we were asked to
+    self._CheckCancel()
 
-      self._op.status = constants.OP_STATUS_RUNNING
-      self._op.exec_timestamp = TimeStampNow()
-    finally:
-      self._queue.release()
+    logging.debug("Opcode is now running")
+    self._op.status = constants.OP_STATUS_RUNNING
+    self._op.exec_timestamp = TimeStampNow()
+
+    # And finally replicate the job status
+    self._queue.UpdateJobUnlocked(self._job)
+
+  @locking.ssynchronized(_QUEUE, shared=1)
+  def _AppendFeedback(self, timestamp, log_type, log_msg):
+    """Internal feedback append function, with locks
+
+    """
+    self._job.log_serial += 1
+    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+    self._queue.UpdateJobUnlocked(self._job, replicate=False)
 
   def Feedback(self, *args):
     """Append a log entry.
@@ -452,15 +479,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # The time is split to make serialization easier and not lose
     # precision.
     timestamp = utils.SplitTime(time.time())
-
-    self._queue.acquire()
-    try:
-      self._job.log_serial += 1
-      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
-
-      self._job.change.notifyAll()
-    finally:
-      self._queue.release()
+    self._AppendFeedback(timestamp, log_type, log_msg)
 
   def ReportLocks(self, msg):
     """Write locking information to the job.
@@ -468,9 +487,208 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     Called whenever the LU processor is waiting for a lock or has acquired one.
 
     """
+    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+                               constants.OP_STATUS_CANCELING)
+
     # Not getting the queue lock because this is a single assignment
     self._job.lock_status = msg
 
+    # Cancel here if we were asked to
+    self._CheckCancel()
+
+
+class _JobChangesChecker(object):
+  def __init__(self, fields, prev_job_info, prev_log_serial):
+    """Initializes this class.
+
+    @type fields: list of strings
+    @param fields: Fields requested by LUXI client
+    @type prev_job_info: string
+    @param prev_job_info: previous job info, as passed by the LUXI client
+    @type prev_log_serial: string
+    @param prev_log_serial: previous job serial, as passed by the LUXI client
+
+    """
+    self._fields = fields
+    self._prev_job_info = prev_job_info
+    self._prev_log_serial = prev_log_serial
+
+  def __call__(self, job):
+    """Checks whether job has changed.
+
+    @type job: L{_QueuedJob}
+    @param job: Job object
+
+    """
+    status = job.CalcStatus()
+    job_info = job.GetInfo(self._fields)
+    log_entries = job.GetLogEntries(self._prev_log_serial)
+
+    # Serializing and deserializing data can cause type changes (e.g. from
+    # tuple to list) or precision loss. We're doing it here so that we get
+    # the same modifications as the data received from the client. Without
+    # this, the comparison afterwards might fail without the data being
+    # significantly different.
+    # TODO: we just deserialized from disk, investigate how to make sure that
+    # the job info and log entries are compatible to avoid this further step.
+    # TODO: Doing something like in testutils.py:UnifyValueType might be more
+    # efficient, though floats will be tricky
+    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
+    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
+
+    # Don't even try to wait if the job is no longer running, there will be
+    # no changes.
+    if (status not in (constants.JOB_STATUS_QUEUED,
+                       constants.JOB_STATUS_RUNNING,
+                       constants.JOB_STATUS_WAITLOCK) or
+        job_info != self._prev_job_info or
+        (log_entries and self._prev_log_serial != log_entries[0][0])):
+      logging.debug("Job %s changed", job.id)
+      return (job_info, log_entries)
+
+    return None
+
+
+class _JobFileChangesWaiter(object):
+  def __init__(self, filename):
+    """Initializes this class.
+
+    @type filename: string
+    @param filename: Path to job file
+    @raises errors.InotifyError: if the notifier cannot be setup
+
+    """
+    self._wm = pyinotify.WatchManager()
+    self._inotify_handler = \
+      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
+    self._notifier = \
+      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
+    try:
+      self._inotify_handler.enable()
+    except Exception:
+      # pyinotify doesn't close file descriptors automatically
+      self._notifier.stop()
+      raise
+
+  def _OnInotify(self, notifier_enabled):
+    """Callback for inotify.
+
+    """
+    if not notifier_enabled:
+      self._inotify_handler.enable()
+
+  def Wait(self, timeout):
+    """Waits for the job file to change.
+
+    @type timeout: float
+    @param timeout: Timeout in seconds
+    @return: Whether there have been events
+
+    """
+    assert timeout >= 0
+    have_events = self._notifier.check_events(timeout * 1000)
+    if have_events:
+      self._notifier.read_events()
+    self._notifier.process_events()
+    return have_events
+
+  def Close(self):
+    """Closes underlying notifier and its file descriptor.
+
+    """
+    self._notifier.stop()
+
+
+class _JobChangesWaiter(object):
+  def __init__(self, filename):
+    """Initializes this class.
+
+    @type filename: string
+    @param filename: Path to job file
+
+    """
+    self._filewaiter = None
+    self._filename = filename
+
+  def Wait(self, timeout):
+    """Waits for a job to change.
+
+    @type timeout: float
+    @param timeout: Timeout in seconds
+    @return: Whether there have been events
+
+    """
+    if self._filewaiter:
+      return self._filewaiter.Wait(timeout)
+
+    # Lazy setup: Avoid inotify setup cost when job file has already changed.
+    # If this point is reached, return immediately and let caller check the job
+    # file again in case there were changes since the last check. This avoids a
+    # race condition.
+    self._filewaiter = _JobFileChangesWaiter(self._filename)
+
+    return True
+
+  def Close(self):
+    """Closes underlying waiter.
+
+    """
+    if self._filewaiter:
+      self._filewaiter.Close()
+
+
+class _WaitForJobChangesHelper(object):
+  """Helper class using inotify to wait for changes in a job file.
+
+  This class takes a previous job status and serial, and alerts the client when
+  the current job status has changed.
+
+  """
+  @staticmethod
+  def _CheckForChanges(job_load_fn, check_fn):
+    job = job_load_fn()
+    if not job:
+      raise errors.JobLost()
+
+    result = check_fn(job)
+    if result is None:
+      raise utils.RetryAgain()
+
+    return result
+
+  def __call__(self, filename, job_load_fn,
+               fields, prev_job_info, prev_log_serial, timeout):
+    """Waits for changes on a job.
+
+    @type filename: string
+    @param filename: File on which to wait for changes
+    @type job_load_fn: callable
+    @param job_load_fn: Function to load job
+    @type fields: list of strings
+    @param fields: Which fields to check for changes
+    @type prev_job_info: list or None
+    @param prev_job_info: Last job information returned
+    @type prev_log_serial: int
+    @param prev_log_serial: Last job message serial number
+    @type timeout: float
+    @param timeout: maximum time to wait in seconds
+
+    """
+    try:
+      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
+      waiter = _JobChangesWaiter(filename)
+      try:
+        return utils.Retry(compat.partial(self._CheckForChanges,
+                                          job_load_fn, check_fn),
+                           utils.RETRY_REMAINING_TIME, timeout,
+                           wait_fn=waiter.Wait)
+      finally:
+        waiter.Close()
+    except (errors.InotifyError, errors.JobLost):
+      return None
+    except utils.RetryTimeout:
+      return constants.JOB_NOTCHANGED
+
 
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
@@ -507,11 +725,14 @@ class _JobQueueWorker(workerpool.BaseWorker):
             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                          op_summary)
 
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
               if op.status == constants.OP_STATUS_CANCELED:
+                logging.debug("Canceling opcode")
                 raise CancelJob()
               assert op.status == constants.OP_STATUS_QUEUED
+              logging.debug("Opcode %s/%s waiting for locks",
+                            idx + 1, count)
               op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
@@ -527,11 +748,20 @@ class _JobQueueWorker(workerpool.BaseWorker):
             result = proc.ExecOpCode(input_opcode,
                                      _OpExecCallbacks(queue, job, op))
 
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
+              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
               op.status = constants.OP_STATUS_SUCCESS
               op.result = result
               op.end_timestamp = TimeStampNow()
+              if idx == count - 1:
+                job.lock_status = None
+                job.end_timestamp = TimeStampNow()
+
+                # Consistency check
+                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
+                                  for i in job.ops)
+
               queue.UpdateJobUnlocked(job)
             finally:
               queue.release()
@@ -542,27 +772,46 @@ class _JobQueueWorker(workerpool.BaseWorker):
             # Will be handled further up
             raise
           except Exception, err:
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
               try:
+                logging.debug("Opcode %s/%s failed", idx + 1, count)
                 op.status = constants.OP_STATUS_ERROR
                 if isinstance(err, errors.GenericError):
-                  op.result = errors.EncodeException(err)
+                  to_encode = err
                 else:
-                  op.result = str(err)
+                  to_encode = errors.OpExecError(str(err))
+                op.result = errors.EncodeException(to_encode)
                 op.end_timestamp = TimeStampNow()
                 logging.info("Op %s/%s: Error in opcode %s: %s",
                              idx + 1, count, op_summary, err)
+
+                to_encode = errors.OpExecError("Preceding opcode failed")
+                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                      errors.EncodeException(to_encode))
+
+                # Consistency check
+                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
+                                  for i in job.ops[:idx])
+                assert compat.all(i.status == constants.OP_STATUS_ERROR and
+                                  errors.GetEncodedError(i.result)
+                                  for i in job.ops[idx:])
               finally:
+                job.lock_status = None
+                job.end_timestamp = TimeStampNow()
                 queue.UpdateJobUnlocked(job)
             finally:
               queue.release()
             raise
 
       except CancelJob:
-        queue.acquire()
+        queue.acquire(shared=1)
         try:
-          queue.CancelJobUnlocked(job)
+          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                                "Job canceled by request")
+          job.lock_status = None
+          job.end_timestamp = TimeStampNow()
+          queue.UpdateJobUnlocked(job)
         finally:
           queue.release()
       except errors.GenericError, err:
@@ -570,19 +819,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
       except:
         logging.exception("Unhandled exception")
     finally:
-      queue.acquire()
-      try:
-        try:
-          job.lock_status = None
-          job.end_timestamp = TimeStampNow()
-          queue.UpdateJobUnlocked(job)
-        finally:
-          job_id = job.id
-          status = job.CalcStatus()
-      finally:
-        queue.release()
-
-      logging.info("Finished job %s, status = %s", job_id, status)
+      status = job.CalcStatus()
+      logging.info("Finished job %s, status = %s", job.id, status)
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
@@ -605,10 +843,10 @@ def _RequireOpenQueue(fn):
   that the decorated function is called with a first argument that has
   a '_queue_filelock' argument.
 
-  @warning: Use this decorator only after utils.LockedMethod!
+  @warning: Use this decorator only after locking.ssynchronized
 
   Example::
-    @utils.LockedMethod
+    @locking.ssynchronized(_LOCK)
     @_RequireOpenQueue
     def Example(self):
       pass
@@ -644,10 +882,15 @@ class JobQueue(object):
     """
     self.context = context
     self._memcache = weakref.WeakValueDictionary()
-    self._my_hostname = utils.HostInfo().name
+    self._my_hostname = netutils.Hostname.GetSysName()
+
+    # The Big JobQueue lock. If a code block or method acquires it in shared
+    # mode safe it must guarantee concurrency with all the code acquiring it in
+    # shared mode, including itself. In order not to acquire it at all
+    # concurrency must be guaranteed with all code acquiring it in shared mode
+    # and all code acquiring it exclusively.
+    self._lock = locking.SharedLock("JobQueue")
 
-    # Locking
-    self._lock = threading.Lock()
     self.acquire = self._lock.acquire
     self.release = self._lock.release
 
@@ -703,17 +946,14 @@ class JobQueue(object):
           status = job.CalcStatus()
 
           if status in (constants.JOB_STATUS_QUEUED, ):
-            self._wpool.AddTask(job)
+            self._wpool.AddTask((job, ))
 
           elif status in (constants.JOB_STATUS_RUNNING,
                           constants.JOB_STATUS_WAITLOCK,
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
-            try:
-              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                                    "Unclean master daemon shutdown")
-            finally:
-              self.UpdateJobUnlocked(job)
+            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                  "Unclean master daemon shutdown")
 
         logging.info("Job queue inspection finished")
       finally:
@@ -722,7 +962,7 @@ class JobQueue(object):
       self._wpool.TerminateWorkers()
       raise
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def AddNode(self, node):
     """Register a new node with the queue.
@@ -767,7 +1007,7 @@ class JobQueue(object):
 
     self._nodes[node_name] = node.primary_ip
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def RemoveNode(self, node_name):
     """Callback called when removing nodes from the cluster.
@@ -818,6 +1058,7 @@ class JobQueue(object):
         names and the second one with the node addresses
 
     """
+    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
     name_list = self._nodes.keys()
     addr_list = [self._nodes[name] for name in name_list]
     return name_list, addr_list
@@ -987,6 +1228,8 @@ class JobQueue(object):
 
     try:
       job = self._LoadJobFromDisk(job_id)
+      if job is None:
+        return job
     except errors.JobFileCorrupted:
       old_path = self._GetJobPath(job_id)
       new_path = self._GetArchivedJobPath(job_id)
@@ -1069,7 +1312,7 @@ class JobQueue(object):
     """
     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def SetDrainFlag(self, drain_flag):
     """Sets the drain flag for the queue.
@@ -1124,7 +1367,7 @@ class JobQueue(object):
 
     return job
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def SubmitJob(self, ops):
     """Create and store a new job.
@@ -1133,10 +1376,10 @@ class JobQueue(object):
 
     """
     job_id = self._NewSerialsUnlocked(1)[0]
-    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
+    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
     return job_id
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def SubmitManyJobs(self, jobs):
     """Create and store multiple jobs.
@@ -1179,11 +1422,6 @@ class JobQueue(object):
     logging.debug("Writing job %s to %s", job.id, filename)
     self._UpdateJobQueueFile(filename, data, replicate)
 
-    # Notify waiters about potential changes
-    job.change.notifyAll()
-
-  @utils.LockedMethod
-  @_RequireOpenQueue
   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                         timeout):
     """Waits for changes in a job.
@@ -1197,7 +1435,7 @@ class JobQueue(object):
     @type prev_log_serial: int
     @param prev_log_serial: Last job message serial number
     @type timeout: float
-    @param timeout: maximum time to wait
+    @param timeout: maximum time to wait in seconds
     @rtype: tuple (job info, log entries)
     @return: a tuple of the job information as required via
         the fields parameter, and the log entries as a list
@@ -1208,46 +1446,14 @@ class JobQueue(object):
         as such by the clients
 
     """
-    job = self._LoadJobUnlocked(job_id)
-    if not job:
-      logging.debug("Job %s not found", job_id)
-      return None
+    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
 
-    def _CheckForChanges():
-      logging.debug("Waiting for changes in job %s", job_id)
+    helper = _WaitForJobChangesHelper()
 
-      status = job.CalcStatus()
-      job_info = job.GetInfo(fields)
-      log_entries = job.GetLogEntries(prev_log_serial)
-
-      # Serializing and deserializing data can cause type changes (e.g. from
-      # tuple to list) or precision loss. We're doing it here so that we get
-      # the same modifications as the data received from the client. Without
-      # this, the comparison afterwards might fail without the data being
-      # significantly different.
-      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
-      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
-
-      # Don't even try to wait if the job is no longer running, there will be
-      # no changes.
-      if (status not in (constants.JOB_STATUS_QUEUED,
-                         constants.JOB_STATUS_RUNNING,
-                         constants.JOB_STATUS_WAITLOCK) or
-          prev_job_info != job_info or
-          (log_entries and prev_log_serial != log_entries[0][0])):
-        logging.debug("Job %s changed", job_id)
-        return (job_info, log_entries)
+    return helper(self._GetJobPath(job_id), load_fn,
+                  fields, prev_job_info, prev_log_serial, timeout)
 
-      raise utils.RetryAgain()
-
-    try:
-      # Setting wait function to release the queue lock while waiting
-      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
-                         wait_fn=job.change.wait)
-    except utils.RetryTimeout:
-      return constants.JOB_NOTCHANGED
-
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def CancelJob(self, job_id):
     """Cancels a job.
@@ -1273,29 +1479,16 @@ class JobQueue(object):
       return (False, "Job %s is no longer waiting in the queue" % job.id)
 
     if job_status == constants.JOB_STATUS_QUEUED:
-      self.CancelJobUnlocked(job)
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                            "Job canceled by request")
       return (True, "Job %s canceled" % job.id)
 
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
-      try:
-        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
-      finally:
-        self.UpdateJobUnlocked(job)
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       return (True, "Job %s will be canceled" % job.id)
 
   @_RequireOpenQueue
-  def CancelJobUnlocked(self, job):
-    """Marks a job as canceled.
-
-    """
-    try:
-      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
-                            "Job canceled by request")
-    finally:
-      self.UpdateJobUnlocked(job)
-
-  @_RequireOpenQueue
   def _ArchiveJobsUnlocked(self, jobs):
     """Archives jobs.
 
@@ -1308,9 +1501,7 @@ class JobQueue(object):
     archive_jobs = []
     rename_files = []
     for job in jobs:
-      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
-                                  constants.JOB_STATUS_SUCCESS,
-                                  constants.JOB_STATUS_ERROR):
+      if job.CalcStatus() not in constants.JOBS_FINALIZED:
         logging.debug("Job %s is not yet done", job.id)
         continue
 
@@ -1333,7 +1524,7 @@ class JobQueue(object):
     self._UpdateQueueSizeUnlocked()
     return len(archive_jobs)
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
@@ -1355,7 +1546,7 @@ class JobQueue(object):
 
     return self._ArchiveJobsUnlocked([job]) == 1
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def AutoArchiveJobs(self, age, timeout):
     """Archives all jobs based on age.
@@ -1440,7 +1631,7 @@ class JobQueue(object):
 
     return jobs
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def Shutdown(self):
     """Stops the job queue.