Implement job 'waiting' status
authorIustin Pop <iustin@google.com>
Tue, 7 Oct 2008 08:03:19 +0000 (08:03 +0000)
committerIustin Pop <iustin@google.com>
Tue, 7 Oct 2008 08:03:19 +0000 (08:03 +0000)
Background: when we have multiple jobs in the queue (more than just a
few), many of the jobs (up to the number of threads) will be in state
'running', although many of them could be actually blocked, waiting for
some locks. This is not good, as one cannot easily see what is
happening.

The patch extends the opcode/job possible statuses with another one,
waiting, which shows that the LU is in the acquire locks phase. The
mechanism for doing so is simple, we initialize (in the job queue) the
opcode with OP_STATUS_WAITLOCK, and when the processor is ready to give
control to the LU's Exec, it will call a notifier back into the
_JobQueueWorker that sets the opcode status to OP_STATUS_RUNNING (with
the proper queue locking). Because this mechanism does not save the job,
all opcodes on disk will be in status WAITLOCK and not RUNNING anymore,
so we also change the load sequence to consider WAITLOCK as RUNNING.

With the patch applied, creating in parallel (via burnin) five instances
on a five node cluster shows that only two are executing, while three
are waiting for locks.

Reviewed-by: imsnah

daemons/ganeti-masterd
lib/constants.py
lib/jqueue.py
lib/mcpu.py
scripts/gnt-job

index 7ba175d..f49bc43 100755 (executable)
@@ -261,7 +261,7 @@ class ClientOps:
     """
     proc = mcpu.Processor(self.server.context)
     # TODO: Where should log messages go?
-    return proc.ExecOpCode(op, self._DummyLog)
+    return proc.ExecOpCode(op, self._DummyLog, None)
 
 
 class GanetiContext(object):
index 424fbb5..afd40c7 100644 (file)
@@ -304,12 +304,14 @@ JOB_NOTCHANGED = "nochange"
 
 # Job status
 JOB_STATUS_QUEUED = "queued"
+JOB_STATUS_WAITLOCK = "waiting"
 JOB_STATUS_RUNNING = "running"
 JOB_STATUS_CANCELED = "canceled"
 JOB_STATUS_SUCCESS = "success"
 JOB_STATUS_ERROR = "error"
 
 OP_STATUS_QUEUED = "queued"
+OP_STATUS_WAITLOCK = "waiting"
 OP_STATUS_RUNNING = "running"
 OP_STATUS_CANCELED = "canceled"
 OP_STATUS_SUCCESS = "success"
index e596434..a4c7b1d 100644 (file)
@@ -159,6 +159,8 @@ class _QueuedJob(object):
 
       if op.status == constants.OP_STATUS_QUEUED:
         pass
+      elif op.status == constants.OP_STATUS_WAITLOCK:
+        status = constants.JOB_STATUS_WAITLOCK
       elif op.status == constants.OP_STATUS_RUNNING:
         status = constants.JOB_STATUS_RUNNING
       elif op.status == constants.OP_STATUS_ERROR:
@@ -188,6 +190,24 @@ class _QueuedJob(object):
 
 
 class _JobQueueWorker(workerpool.BaseWorker):
+  def _NotifyStart(self):
+    """Mark the opcode as running, not lock-waiting.
+
+    This is called from the mcpu code as a notifier function, when the
+    LU is finally about to start the Exec() method. Of course, to have
+    end-user visible results, the opcode must be initially (before
+    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+
+    """
+    assert self.queue, "Queue attribute is missing"
+    assert self.opcode, "Opcode attribute is missing"
+
+    self.queue.acquire()
+    try:
+      self.opcode.status = constants.OP_STATUS_RUNNING
+    finally:
+      self.queue.release()
+
   def RunTask(self, job):
     """Job executor.
 
@@ -198,7 +218,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
     logging.debug("Worker %s processing job %s",
                   self.worker_id, job.id)
     proc = mcpu.Processor(self.pool.queue.context)
-    queue = job.queue
+    self.queue = queue = job.queue
     try:
       try:
         count = len(job.ops)
@@ -209,7 +229,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
             queue.acquire()
             try:
               job.run_op_index = idx
-              op.status = constants.OP_STATUS_RUNNING
+              op.status = constants.OP_STATUS_WAITLOCK
               op.result = None
               op.start_timestamp = TimeStampNow()
               if idx == 0: # first opcode
@@ -246,7 +266,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
                 queue.release()
 
             # Make sure not to hold lock while _Log is called
-            result = proc.ExecOpCode(input_opcode, _Log)
+            self.opcode = op
+            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
 
             queue.acquire()
             try:
@@ -365,7 +386,8 @@ class JobQueue(object):
         if status in (constants.JOB_STATUS_QUEUED, ):
           self._wpool.AddTask(job)
 
-        elif status in (constants.JOB_STATUS_RUNNING, ):
+        elif status in (constants.JOB_STATUS_RUNNING,
+                        constants.JOB_STATUS_WAITLOCK):
           logging.warning("Unfinished job %s found: %s", job.id, job)
           try:
             for op in job.ops:
@@ -621,7 +643,8 @@ class JobQueue(object):
       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
 
       if status not in (constants.JOB_STATUS_QUEUED,
-                        constants.JOB_STATUS_RUNNING):
+                        constants.JOB_STATUS_RUNNING,
+                        constants.JOB_STATUS_WAITLOCK):
         # Don't even try to wait if the job is no longer running, there will be
         # no changes.
         break
index 2f60fd7..af0202a 100644 (file)
@@ -131,6 +131,8 @@ class Processor(object):
     adding_locks = level in lu.add_locks
     acquiring_locks = level in lu.needed_locks
     if level not in locking.LEVELS:
+      if callable(self._run_notifier):
+        self._run_notifier()
       result = self._ExecLU(lu)
     elif adding_locks and acquiring_locks:
       # We could both acquire and add locks at the same level, but for now we
@@ -170,11 +172,18 @@ class Processor(object):
 
     return result
 
-  def ExecOpCode(self, op, feedback_fn):
+  def ExecOpCode(self, op, feedback_fn, run_notifier):
     """Execute an opcode.
 
-    Args:
-      op: the opcode to be executed
+    @type op: an OpCode instance
+    @param op: the opcode to be executed
+    @type feedback_fn: a function that takes a single argument
+    @param feedback_fn: this function will be used as feedback from the LU
+                        code to the end-user
+    @type run_notifier: callable (no arguments) or None
+    @param run_notifier:  this function (if callable) will be called when
+                          we are about to call the lu's Exec() method, that
+                          is, after we have aquired all locks
 
     """
     if not isinstance(op, opcodes.OpCode):
@@ -182,6 +191,7 @@ class Processor(object):
                                    " to ExecOpcode")
 
     self._feedback_fn = feedback_fn
+    self._run_notifier = run_notifier
     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
     if lu_class is None:
       raise errors.OpCodeUnknown("Unknown opcode")
index ab00234..d9a2253 100755 (executable)
@@ -38,6 +38,7 @@ _LIST_DEF_FIELDS = ["id", "status", "summary"]
 
 _USER_JOB_STATUS = {
   constants.JOB_STATUS_QUEUED: "queued",
+  constants.JOB_STATUS_WAITLOCK: "waiting",
   constants.JOB_STATUS_RUNNING: "running",
   constants.JOB_STATUS_CANCELED: "canceled",
   constants.JOB_STATUS_SUCCESS: "success",