Document OpNodeMigrate's result for RAPI
[ganeti-local] / lib / jqueue.py
index ecbd82a..d5ea3cb 100644 (file)
@@ -31,13 +31,13 @@ used by all other classes in this module.
 
 import logging
 import errno
-import re
 import time
 import weakref
 import threading
+import itertools
 
 try:
-  # pylint: disable-msg=E0611
+  # pylint: disable=E0611
   from pyinotify import pyinotify
 except ImportError:
   import pyinotify
@@ -177,7 +177,7 @@ class _QueuedJob(object):
   @ivar writable: Whether the job is allowed to be modified
 
   """
-  # pylint: disable-msg=W0212
+  # pylint: disable=W0212
   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
                "__weakref__", "processor_lock", "writable"]
@@ -312,8 +312,8 @@ class _QueuedJob(object):
 
       if op.status == constants.OP_STATUS_QUEUED:
         pass
-      elif op.status == constants.OP_STATUS_WAITLOCK:
-        status = constants.JOB_STATUS_WAITLOCK
+      elif op.status == constants.OP_STATUS_WAITING:
+        status = constants.JOB_STATUS_WAITING
       elif op.status == constants.OP_STATUS_RUNNING:
         status = constants.JOB_STATUS_RUNNING
       elif op.status == constants.OP_STATUS_CANCELING:
@@ -461,7 +461,7 @@ class _QueuedJob(object):
       self.Finalize()
       return (True, "Job %s canceled" % self.id)
 
-    elif status == constants.JOB_STATUS_WAITLOCK:
+    elif status == constants.JOB_STATUS_WAITING:
       # The worker will notice the new status and cancel the job
       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       return (True, "Job %s will be canceled" % self.id)
@@ -507,11 +507,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     This is called from the mcpu code as a notifier function, when the LU is
     finally about to start the Exec() method. Of course, to have end-user
     visible results, the opcode must be initially (before calling into
-    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+    Processor.ExecOpCode) set to OP_STATUS_WAITING.
 
     """
     assert self._op in self._job.ops
-    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+    assert self._op.status in (constants.OP_STATUS_WAITING,
                                constants.OP_STATUS_CANCELING)
 
     # Cancel here if we were asked to
@@ -555,7 +555,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     """Check whether job has been cancelled.
 
     """
-    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+    assert self._op.status in (constants.OP_STATUS_WAITING,
                                constants.OP_STATUS_CANCELING)
 
     # Cancel here if we were asked to
@@ -616,7 +616,7 @@ class _JobChangesChecker(object):
     # no changes.
     if (status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING,
-                       constants.JOB_STATUS_WAITLOCK) or
+                       constants.JOB_STATUS_WAITING) or
         job_info != self._prev_job_info or
         (log_entries and self._prev_log_serial != log_entries[0][0])):
       logging.debug("Job %s changed", job.id)
@@ -721,7 +721,13 @@ class _WaitForJobChangesHelper(object):
 
   """
   @staticmethod
-  def _CheckForChanges(job_load_fn, check_fn):
+  def _CheckForChanges(counter, job_load_fn, check_fn):
+    if counter.next() > 0:
+      # If this isn't the first check the job is given some more time to change
+      # again. This gives better performance for jobs generating many
+      # changes/messages.
+      time.sleep(0.1)
+
     job = job_load_fn()
     if not job:
       raise errors.JobLost()
@@ -750,12 +756,13 @@ class _WaitForJobChangesHelper(object):
     @param timeout: maximum time to wait in seconds
 
     """
+    counter = itertools.count()
     try:
       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
       waiter = _JobChangesWaiter(filename)
       try:
         return utils.Retry(compat.partial(self._CheckForChanges,
-                                          job_load_fn, check_fn),
+                                          counter, job_load_fn, check_fn),
                            utils.RETRY_REMAINING_TIME, timeout,
                            wait_fn=waiter.Wait)
       finally:
@@ -931,14 +938,14 @@ class _JobProcessor(object):
     """
     assert op in job.ops
     assert op.status in (constants.OP_STATUS_QUEUED,
-                         constants.OP_STATUS_WAITLOCK)
+                         constants.OP_STATUS_WAITING)
 
     update = False
 
     op.result = None
 
     if op.status == constants.OP_STATUS_QUEUED:
-      op.status = constants.OP_STATUS_WAITLOCK
+      op.status = constants.OP_STATUS_WAITING
       update = True
 
     if op.start_timestamp is None:
@@ -949,7 +956,7 @@ class _JobProcessor(object):
       job.start_timestamp = op.start_timestamp
       update = True
 
-    assert op.status == constants.OP_STATUS_WAITLOCK
+    assert op.status == constants.OP_STATUS_WAITING
 
     return update
 
@@ -1015,7 +1022,7 @@ class _JobProcessor(object):
     """
     op = opctx.op
 
-    assert op.status == constants.OP_STATUS_WAITLOCK
+    assert op.status == constants.OP_STATUS_WAITING
 
     timeout = opctx.GetNextLockTimeout()
 
@@ -1028,7 +1035,7 @@ class _JobProcessor(object):
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
 
-      assert op.status in (constants.OP_STATUS_WAITLOCK,
+      assert op.status in (constants.OP_STATUS_WAITING,
                            constants.OP_STATUS_CANCELING)
 
       # Was job cancelled while we were waiting for the lock?
@@ -1036,12 +1043,12 @@ class _JobProcessor(object):
         return (constants.OP_STATUS_CANCELING, None)
 
       # Stay in waitlock while trying to re-acquire lock
-      return (constants.OP_STATUS_WAITLOCK, None)
+      return (constants.OP_STATUS_WAITING, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
       return (constants.OP_STATUS_CANCELING, None)
-    except Exception, err: # pylint: disable-msg=W0703
+    except Exception, err: # pylint: disable=W0703
       logging.exception("%s: Caught exception in %s",
                         opctx.log_prefix, opctx.summary)
       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
@@ -1091,7 +1098,7 @@ class _JobProcessor(object):
                         for i in job.ops[opctx.index + 1:])
 
       assert op.status in (constants.OP_STATUS_QUEUED,
-                           constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_WAITING,
                            constants.OP_STATUS_CANCELING)
 
       assert (op.priority <= constants.OP_PRIO_LOWEST and
@@ -1101,22 +1108,22 @@ class _JobProcessor(object):
 
       if op.status != constants.OP_STATUS_CANCELING:
         assert op.status in (constants.OP_STATUS_QUEUED,
-                             constants.OP_STATUS_WAITLOCK)
+                             constants.OP_STATUS_WAITING)
 
         # Prepare to start opcode
         if self._MarkWaitlock(job, op):
           # Write to disk
           queue.UpdateJobUnlocked(job)
 
-        assert op.status == constants.OP_STATUS_WAITLOCK
-        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
+        assert op.status == constants.OP_STATUS_WAITING
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
         assert job.start_timestamp and op.start_timestamp
         assert waitjob is None
 
         # Check if waiting for a job is necessary
         waitjob = self._CheckDependencies(queue, job, opctx)
 
-        assert op.status in (constants.OP_STATUS_WAITLOCK,
+        assert op.status in (constants.OP_STATUS_WAITING,
                              constants.OP_STATUS_CANCELING,
                              constants.OP_STATUS_ERROR)
 
@@ -1138,7 +1145,7 @@ class _JobProcessor(object):
 
           assert not waitjob
 
-        if op.status == constants.OP_STATUS_WAITLOCK:
+        if op.status == constants.OP_STATUS_WAITING:
           # Couldn't get locks in time
           assert not op.end_timestamp
         else:
@@ -1151,7 +1158,7 @@ class _JobProcessor(object):
           else:
             assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
+      if op.status == constants.OP_STATUS_WAITING or waitjob:
         finalize = False
 
         if not waitjob and opctx.CheckPriorityIncrease():
@@ -1165,7 +1172,7 @@ class _JobProcessor(object):
                 op.priority >= constants.OP_PRIO_HIGHEST)
 
         # In no case must the status be finalized here
-        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
 
       else:
         # Ensure all opcodes so far have been successful
@@ -1234,7 +1241,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
   """
-  def RunTask(self, job): # pylint: disable-msg=W0221
+  def RunTask(self, job): # pylint: disable=W0221
     """Job executor.
 
     @type job: L{_QueuedJob}
@@ -1340,8 +1347,6 @@ class _JobDependencyManager:
    CONTINUE,
    WRONGSTATUS) = range(1, 6)
 
-  # TODO: Export waiter information to lock monitor
-
   def __init__(self, getstatus_fn, enqueue_fn):
     """Initializes this class.
 
@@ -1353,6 +1358,22 @@ class _JobDependencyManager:
     self._lock = locking.SharedLock("JobDepMgr")
 
   @locking.ssynchronized(_LOCK, shared=1)
+  def GetLockInfo(self, requested): # pylint: disable=W0613
+    """Retrieves information about waiting jobs.
+
+    @type requested: set
+    @param requested: Requested information, see C{query.LQ_*}
+
+    """
+    # No need to sort here, that's being done by the lock manager and query
+    # library. There are no priorities for notifying jobs, hence all show up as
+    # one item under "pending".
+    return [("job/%s" % job_id, None, None,
+             [("job", [job.id for job in waiters])])
+            for job_id, waiters in self._waiters.items()
+            if waiters]
+
+  @locking.ssynchronized(_LOCK, shared=1)
   def JobWaiting(self, job):
     """Checks if a job is waiting.
 
@@ -1460,7 +1481,7 @@ def _RequireOpenQueue(fn):
 
   """
   def wrapper(self, *args, **kwargs):
-    # pylint: disable-msg=W0212
+    # pylint: disable=W0212
     assert self._queue_filelock is not None, "Queue should be open"
     return fn(self, *args, **kwargs)
   return wrapper
@@ -1469,11 +1490,7 @@ def _RequireOpenQueue(fn):
 class JobQueue(object):
   """Queue used to manage the jobs.
 
-  @cvar _RE_JOB_FILE: regex matching the valid job file names
-
   """
-  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
-
   def __init__(self, context):
     """Constructor for JobQueue.
 
@@ -1527,6 +1544,7 @@ class JobQueue(object):
     # Job dependencies
     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
                                         self._EnqueueJobs)
+    self.context.glm.AddToLockMonitor(self.depmgr)
 
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
@@ -1572,11 +1590,11 @@ class JobQueue(object):
         restartjobs.append(job)
 
       elif status in (constants.JOB_STATUS_RUNNING,
-                      constants.JOB_STATUS_WAITLOCK,
+                      constants.JOB_STATUS_WAITING,
                       constants.JOB_STATUS_CANCELING):
         logging.warning("Unfinished job %s found: %s", job.id, job)
 
-        if status == constants.JOB_STATUS_WAITLOCK:
+        if status == constants.JOB_STATUS_WAITING:
           # Restart job
           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
           restartjobs.append(job)
@@ -1780,7 +1798,8 @@ class JobQueue(object):
     @return: a string representing the job identifier.
 
     """
-    assert count > 0
+    assert ht.TPositiveInt(count)
+
     # New number
     serial = self._last_serial + count
 
@@ -1823,7 +1842,8 @@ class JobQueue(object):
     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
 
-  def _GetJobIDsUnlocked(self, sort=True):
+  @staticmethod
+  def _GetJobIDsUnlocked(sort=True):
     """Return all known job IDs.
 
     The method only looks at disk because it's a requirement that all
@@ -1838,7 +1858,7 @@ class JobQueue(object):
     """
     jlist = []
     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
-      m = self._RE_JOB_FILE.match(filename)
+      m = constants.JOB_FILE_RE.match(filename)
       if m:
         jlist.append(m.group(1))
     if sort:
@@ -1926,7 +1946,7 @@ class JobQueue(object):
     try:
       data = serializer.LoadJson(raw_data)
       job = _QueuedJob.Restore(self, data, writable)
-    except Exception, err: # pylint: disable-msg=W0703
+    except Exception, err: # pylint: disable=W0703
       raise errors.JobFileCorrupted(err)
 
     return job
@@ -2168,7 +2188,7 @@ class JobQueue(object):
     # Try to load from disk
     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
 
-    assert not job.writable, "Got writable job"
+    assert not job.writable, "Got writable job" # pylint: disable=E1101
 
     if job:
       return job.CalcStatus()