Document OpNodeMigrate's result for RAPI
[ganeti-local] / lib / jqueue.py
index 3b1d61a..d5ea3cb 100644 (file)
@@ -31,13 +31,13 @@ used by all other classes in this module.
 
 import logging
 import errno
-import re
 import time
 import weakref
 import threading
+import itertools
 
 try:
-  # pylint: disable-msg=E0611
+  # pylint: disable=E0611
   from pyinotify import pyinotify
 except ImportError:
   import pyinotify
@@ -177,7 +177,7 @@ class _QueuedJob(object):
   @ivar writable: Whether the job is allowed to be modified
 
   """
-  # pylint: disable-msg=W0212
+  # pylint: disable=W0212
   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
                "__weakref__", "processor_lock", "writable"]
@@ -217,7 +217,12 @@ class _QueuedJob(object):
     obj.writable = writable
     obj.ops_iter = None
     obj.cur_opctx = None
-    obj.processor_lock = threading.Lock()
+
+    # Read-only jobs are not processed and therefore don't need a lock
+    if writable:
+      obj.processor_lock = threading.Lock()
+    else:
+      obj.processor_lock = None
 
   def __repr__(self):
     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
@@ -307,8 +312,8 @@ class _QueuedJob(object):
 
       if op.status == constants.OP_STATUS_QUEUED:
         pass
-      elif op.status == constants.OP_STATUS_WAITLOCK:
-        status = constants.JOB_STATUS_WAITLOCK
+      elif op.status == constants.OP_STATUS_WAITING:
+        status = constants.JOB_STATUS_WAITING
       elif op.status == constants.OP_STATUS_RUNNING:
         status = constants.JOB_STATUS_RUNNING
       elif op.status == constants.OP_STATUS_CANCELING:
@@ -456,7 +461,7 @@ class _QueuedJob(object):
       self.Finalize()
       return (True, "Job %s canceled" % self.id)
 
-    elif status == constants.JOB_STATUS_WAITLOCK:
+    elif status == constants.JOB_STATUS_WAITING:
       # The worker will notice the new status and cancel the job
       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       return (True, "Job %s will be canceled" % self.id)
@@ -502,11 +507,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     This is called from the mcpu code as a notifier function, when the LU is
     finally about to start the Exec() method. Of course, to have end-user
     visible results, the opcode must be initially (before calling into
-    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+    Processor.ExecOpCode) set to OP_STATUS_WAITING.
 
     """
     assert self._op in self._job.ops
-    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+    assert self._op.status in (constants.OP_STATUS_WAITING,
                                constants.OP_STATUS_CANCELING)
 
     # Cancel here if we were asked to
@@ -550,7 +555,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     """Check whether job has been cancelled.
 
     """
-    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+    assert self._op.status in (constants.OP_STATUS_WAITING,
                                constants.OP_STATUS_CANCELING)
 
     # Cancel here if we were asked to
@@ -611,7 +616,7 @@ class _JobChangesChecker(object):
     # no changes.
     if (status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING,
-                       constants.JOB_STATUS_WAITLOCK) or
+                       constants.JOB_STATUS_WAITING) or
         job_info != self._prev_job_info or
         (log_entries and self._prev_log_serial != log_entries[0][0])):
       logging.debug("Job %s changed", job.id)
@@ -716,7 +721,13 @@ class _WaitForJobChangesHelper(object):
 
   """
   @staticmethod
-  def _CheckForChanges(job_load_fn, check_fn):
+  def _CheckForChanges(counter, job_load_fn, check_fn):
+    if counter.next() > 0:
+      # If this isn't the first check the job is given some more time to change
+      # again. This gives better performance for jobs generating many
+      # changes/messages.
+      time.sleep(0.1)
+
     job = job_load_fn()
     if not job:
       raise errors.JobLost()
@@ -745,12 +756,13 @@ class _WaitForJobChangesHelper(object):
     @param timeout: maximum time to wait in seconds
 
     """
+    counter = itertools.count()
     try:
       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
       waiter = _JobChangesWaiter(filename)
       try:
         return utils.Retry(compat.partial(self._CheckForChanges,
-                                          job_load_fn, check_fn),
+                                          counter, job_load_fn, check_fn),
                            utils.RETRY_REMAINING_TIME, timeout,
                            wait_fn=waiter.Wait)
       finally:
@@ -858,6 +870,10 @@ class _OpExecContext:
 
 
 class _JobProcessor(object):
+  (DEFER,
+   WAITDEP,
+   FINISHED) = range(1, 4)
+
   def __init__(self, queue, opexec_fn, job,
                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
     """Initializes this class.
@@ -922,14 +938,14 @@ class _JobProcessor(object):
     """
     assert op in job.ops
     assert op.status in (constants.OP_STATUS_QUEUED,
-                         constants.OP_STATUS_WAITLOCK)
+                         constants.OP_STATUS_WAITING)
 
     update = False
 
     op.result = None
 
     if op.status == constants.OP_STATUS_QUEUED:
-      op.status = constants.OP_STATUS_WAITLOCK
+      op.status = constants.OP_STATUS_WAITING
       update = True
 
     if op.start_timestamp is None:
@@ -940,7 +956,7 @@ class _JobProcessor(object):
       job.start_timestamp = op.start_timestamp
       update = True
 
-    assert op.status == constants.OP_STATUS_WAITLOCK
+    assert op.status == constants.OP_STATUS_WAITING
 
     return update
 
@@ -1006,7 +1022,7 @@ class _JobProcessor(object):
     """
     op = opctx.op
 
-    assert op.status == constants.OP_STATUS_WAITLOCK
+    assert op.status == constants.OP_STATUS_WAITING
 
     timeout = opctx.GetNextLockTimeout()
 
@@ -1019,7 +1035,7 @@ class _JobProcessor(object):
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
 
-      assert op.status in (constants.OP_STATUS_WAITLOCK,
+      assert op.status in (constants.OP_STATUS_WAITING,
                            constants.OP_STATUS_CANCELING)
 
       # Was job cancelled while we were waiting for the lock?
@@ -1027,12 +1043,12 @@ class _JobProcessor(object):
         return (constants.OP_STATUS_CANCELING, None)
 
       # Stay in waitlock while trying to re-acquire lock
-      return (constants.OP_STATUS_WAITLOCK, None)
+      return (constants.OP_STATUS_WAITING, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
       return (constants.OP_STATUS_CANCELING, None)
-    except Exception, err: # pylint: disable-msg=W0703
+    except Exception, err: # pylint: disable=W0703
       logging.exception("%s: Caught exception in %s",
                         opctx.log_prefix, opctx.summary)
       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
@@ -1045,9 +1061,9 @@ class _JobProcessor(object):
     """Continues execution of a job.
 
     @param _nextop_fn: Callback function for tests
-    @rtype: bool
-    @return: True if job is finished, False if processor needs to be called
-             again
+    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
+      be deferred and C{WAITDEP} if the dependency manager
+      (L{_JobDependencyManager}) will re-schedule the job when appropriate
 
     """
     queue = self.queue
@@ -1063,7 +1079,7 @@ class _JobProcessor(object):
 
       # Don't do anything for finalized jobs
       if job.CalcStatus() in constants.JOBS_FINALIZED:
-        return True
+        return self.FINISHED
 
       # Is a previous opcode still pending?
       if job.cur_opctx:
@@ -1082,7 +1098,7 @@ class _JobProcessor(object):
                         for i in job.ops[opctx.index + 1:])
 
       assert op.status in (constants.OP_STATUS_QUEUED,
-                           constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_WAITING,
                            constants.OP_STATUS_CANCELING)
 
       assert (op.priority <= constants.OP_PRIO_LOWEST and
@@ -1092,22 +1108,22 @@ class _JobProcessor(object):
 
       if op.status != constants.OP_STATUS_CANCELING:
         assert op.status in (constants.OP_STATUS_QUEUED,
-                             constants.OP_STATUS_WAITLOCK)
+                             constants.OP_STATUS_WAITING)
 
         # Prepare to start opcode
         if self._MarkWaitlock(job, op):
           # Write to disk
           queue.UpdateJobUnlocked(job)
 
-        assert op.status == constants.OP_STATUS_WAITLOCK
-        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
+        assert op.status == constants.OP_STATUS_WAITING
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
         assert job.start_timestamp and op.start_timestamp
         assert waitjob is None
 
         # Check if waiting for a job is necessary
         waitjob = self._CheckDependencies(queue, job, opctx)
 
-        assert op.status in (constants.OP_STATUS_WAITLOCK,
+        assert op.status in (constants.OP_STATUS_WAITING,
                              constants.OP_STATUS_CANCELING,
                              constants.OP_STATUS_ERROR)
 
@@ -1129,7 +1145,7 @@ class _JobProcessor(object):
 
           assert not waitjob
 
-        if op.status == constants.OP_STATUS_WAITLOCK:
+        if op.status == constants.OP_STATUS_WAITING:
           # Couldn't get locks in time
           assert not op.end_timestamp
         else:
@@ -1142,7 +1158,7 @@ class _JobProcessor(object):
           else:
             assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
+      if op.status == constants.OP_STATUS_WAITING or waitjob:
         finalize = False
 
         if not waitjob and opctx.CheckPriorityIncrease():
@@ -1156,7 +1172,7 @@ class _JobProcessor(object):
                 op.priority >= constants.OP_PRIO_HIGHEST)
 
         # In no case must the status be finalized here
-        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
 
       else:
         # Ensure all opcodes so far have been successful
@@ -1208,13 +1224,14 @@ class _JobProcessor(object):
 
         if finalize:
           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
-          # TODO: Check locking
-          queue.depmgr.NotifyWaiters(job.id)
-          return True
+          return self.FINISHED
 
       assert not waitjob or queue.depmgr.JobWaiting(job)
 
-      return bool(waitjob)
+      if waitjob:
+        return self.WAITDEP
+      else:
+        return self.DEFER
     finally:
       assert job.writable, "Job became read-only while being processed"
       queue.release()
@@ -1224,13 +1241,15 @@ class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
   """
-  def RunTask(self, job): # pylint: disable-msg=W0221
+  def RunTask(self, job): # pylint: disable=W0221
     """Job executor.
 
     @type job: L{_QueuedJob}
     @param job: the job to be processed
 
     """
+    assert job.writable, "Expected writable job"
+
     # Ensure only one worker is active on a single job. If a job registers for
     # a dependency job, and the other job notifies before the first worker is
     # done, the job can end up in the tasklist more than once.
@@ -1258,10 +1277,24 @@ class _JobQueueWorker(workerpool.BaseWorker):
     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
                                     proc.ExecOpCode)
 
-    if not _JobProcessor(queue, wrap_execop_fn, job)():
+    result = _JobProcessor(queue, wrap_execop_fn, job)()
+
+    if result == _JobProcessor.FINISHED:
+      # Notify waiting jobs
+      queue.depmgr.NotifyWaiters(job.id)
+
+    elif result == _JobProcessor.DEFER:
       # Schedule again
       raise workerpool.DeferTask(priority=job.CalcPriority())
 
+    elif result == _JobProcessor.WAITDEP:
+      # No-op, dependency manager will re-schedule
+      pass
+
+    else:
+      raise errors.ProgrammerError("Job processor returned unknown status %s" %
+                                   (result, ))
+
   @staticmethod
   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
     """Updates the worker thread name to include a short summary of the opcode.
@@ -1314,8 +1347,6 @@ class _JobDependencyManager:
    CONTINUE,
    WRONGSTATUS) = range(1, 6)
 
-  # TODO: Export waiter information to lock monitor
-
   def __init__(self, getstatus_fn, enqueue_fn):
     """Initializes this class.
 
@@ -1327,6 +1358,22 @@ class _JobDependencyManager:
     self._lock = locking.SharedLock("JobDepMgr")
 
   @locking.ssynchronized(_LOCK, shared=1)
+  def GetLockInfo(self, requested): # pylint: disable=W0613
+    """Retrieves information about waiting jobs.
+
+    @type requested: set
+    @param requested: Requested information, see C{query.LQ_*}
+
+    """
+    # No need to sort here, that's being done by the lock manager and query
+    # library. There are no priorities for notifying jobs, hence all show up as
+    # one item under "pending".
+    return [("job/%s" % job_id, None, None,
+             [("job", [job.id for job in waiters])])
+            for job_id, waiters in self._waiters.items()
+            if waiters]
+
+  @locking.ssynchronized(_LOCK, shared=1)
   def JobWaiting(self, job):
     """Checks if a job is waiting.
 
@@ -1434,7 +1481,7 @@ def _RequireOpenQueue(fn):
 
   """
   def wrapper(self, *args, **kwargs):
-    # pylint: disable-msg=W0212
+    # pylint: disable=W0212
     assert self._queue_filelock is not None, "Queue should be open"
     return fn(self, *args, **kwargs)
   return wrapper
@@ -1443,11 +1490,7 @@ def _RequireOpenQueue(fn):
 class JobQueue(object):
   """Queue used to manage the jobs.
 
-  @cvar _RE_JOB_FILE: regex matching the valid job file names
-
   """
-  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
-
   def __init__(self, context):
     """Constructor for JobQueue.
 
@@ -1501,6 +1544,7 @@ class JobQueue(object):
     # Job dependencies
     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
                                         self._EnqueueJobs)
+    self.context.glm.AddToLockMonitor(self.depmgr)
 
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
@@ -1546,11 +1590,11 @@ class JobQueue(object):
         restartjobs.append(job)
 
       elif status in (constants.JOB_STATUS_RUNNING,
-                      constants.JOB_STATUS_WAITLOCK,
+                      constants.JOB_STATUS_WAITING,
                       constants.JOB_STATUS_CANCELING):
         logging.warning("Unfinished job %s found: %s", job.id, job)
 
-        if status == constants.JOB_STATUS_WAITLOCK:
+        if status == constants.JOB_STATUS_WAITING:
           # Restart job
           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
           restartjobs.append(job)
@@ -1563,7 +1607,7 @@ class JobQueue(object):
 
     if restartjobs:
       logging.info("Restarting %s jobs", len(restartjobs))
-      self._EnqueueJobs(restartjobs)
+      self._EnqueueJobsUnlocked(restartjobs)
 
     logging.info("Job queue inspection finished")
 
@@ -1754,7 +1798,8 @@ class JobQueue(object):
     @return: a string representing the job identifier.
 
     """
-    assert count > 0
+    assert ht.TPositiveInt(count)
+
     # New number
     serial = self._last_serial + count
 
@@ -1797,7 +1842,8 @@ class JobQueue(object):
     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
 
-  def _GetJobIDsUnlocked(self, sort=True):
+  @staticmethod
+  def _GetJobIDsUnlocked(sort=True):
     """Return all known job IDs.
 
     The method only looks at disk because it's a requirement that all
@@ -1812,7 +1858,7 @@ class JobQueue(object):
     """
     jlist = []
     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
-      m = self._RE_JOB_FILE.match(filename)
+      m = constants.JOB_FILE_RE.match(filename)
       if m:
         jlist.append(m.group(1))
     if sort:
@@ -1900,7 +1946,7 @@ class JobQueue(object):
     try:
       data = serializer.LoadJson(raw_data)
       job = _QueuedJob.Restore(self, data, writable)
-    except Exception, err: # pylint: disable-msg=W0703
+    except Exception, err: # pylint: disable=W0703
       raise errors.JobFileCorrupted(err)
 
     return job
@@ -2008,7 +2054,7 @@ class JobQueue(object):
 
     """
     (job_id, ) = self._NewSerialsUnlocked(1)
-    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
+    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
     return job_id
 
   @locking.ssynchronized(_LOCK)
@@ -2024,7 +2070,7 @@ class JobQueue(object):
     (results, added_jobs) = \
       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
 
-    self._EnqueueJobs(added_jobs)
+    self._EnqueueJobsUnlocked(added_jobs)
 
     return results
 
@@ -2105,6 +2151,7 @@ class JobQueue(object):
 
     return (results, added_jobs)
 
+  @locking.ssynchronized(_LOCK)
   def _EnqueueJobs(self, jobs):
     """Helper function to add jobs to worker pool's queue.
 
@@ -2112,6 +2159,16 @@ class JobQueue(object):
     @param jobs: List of all jobs
 
     """
+    return self._EnqueueJobsUnlocked(jobs)
+
+  def _EnqueueJobsUnlocked(self, jobs):
+    """Helper function to add jobs to worker pool's queue.
+
+    @type jobs: list
+    @param jobs: List of all jobs
+
+    """
+    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
     self._wpool.AddManyTasks([(job, ) for job in jobs],
                              priority=[job.CalcPriority() for job in jobs])
 
@@ -2131,7 +2188,7 @@ class JobQueue(object):
     # Try to load from disk
     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
 
-    assert not job.writable, "Got writable job"
+    assert not job.writable, "Got writable job" # pylint: disable=E1101
 
     if job:
       return job.CalcStatus()