First version of user feedback fixes
authorIustin Pop <iustin@google.com>
Mon, 14 Jul 2008 13:15:58 +0000 (13:15 +0000)
committerIustin Pop <iustin@google.com>
Mon, 14 Jul 2008 13:15:58 +0000 (13:15 +0000)
This patch contains a raw version for fixing feedback_fn.

The new mechanism works as follows:
  - instead of a per-Processor feedback_fn, there's one for each
    ExecOpCode, so that feedback for different opcodes go via possibly
    different functions
  - each _QueuedOpCode gets a message buffer, a method for adding
    feedback and a method for retrieving (parts of) the feedback
  - the _QueuedJob object gets a new attribute that is equal to the
    index of the currently executing opcode
  - job queries get an extra parameter called 'ticker' that will return
    the latest message on the current executing opcode
  - the cli.py job completion poll will show the new status if different
    from the old one

Of course, quick messages will be lost, as currently only the latest one
is available. Also changes between opcodes are not represented at all.

Reviewed-by: imsnah

lib/cli.py
lib/constants.py
lib/jqueue.py
lib/mcpu.py

index 1f89be3..34a2bfb 100644 (file)
@@ -382,8 +382,9 @@ def SubmitOpCode(op, proc=None, feedback_fn=None):
 
   job_id = cl.SubmitJob([op])
 
+  lastmsg = None
   while True:
-    jobs = cl.QueryJobs([job_id], ["status"])
+    jobs = cl.QueryJobs([job_id], ["status", "ticker"])
     if not jobs:
       # job not found, go away!
       raise errors.JobLost("Job with id %s lost" % job_id)
@@ -392,6 +393,10 @@ def SubmitOpCode(op, proc=None, feedback_fn=None):
     status = jobs[0][0]
     if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
       break
+    msg = jobs[0][1]
+    if msg is not None and msg != lastmsg:
+      print "%s %s" % (time.ctime(msg[0]), msg[2])
+    lastmsg = msg
     time.sleep(1)
 
   jobs = cl.QueryJobs([job_id], ["status", "opresult"])
index d3a8c10..6b00e6b 100644 (file)
@@ -262,3 +262,7 @@ OP_STATUS_QUEUED = "queued"
 OP_STATUS_RUNNING = "running"
 OP_STATUS_SUCCESS = "success"
 OP_STATUS_ERROR = "error"
+
+# Execution log types
+ELOG_MESSAGE = "message"
+ELOG_PROGRESS = "progress"
index 0b86f33..950992e 100644 (file)
@@ -26,6 +26,7 @@ import logging
 import threading
 import errno
 import re
+import time
 
 from ganeti import constants
 from ganeti import serializer
@@ -44,21 +45,25 @@ class _QueuedOpCode(object):
 
   Access is synchronized by the '_lock' attribute.
 
+  The 'log' attribute holds the execution log and consists of tuples
+  of the form (timestamp, level, message).
+
   """
   def __init__(self, op):
-    self.__Setup(op, constants.OP_STATUS_QUEUED, None)
+    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
 
-  def __Setup(self, input, status, result):
+  def __Setup(self, input_, status, result, log):
     self._lock = threading.Lock()
-    self.input = input
+    self.input = input_
     self.status = status
     self.result = result
+    self.log = log
 
   @classmethod
   def Restore(cls, state):
     obj = object.__new__(cls)
     obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
-                state["status"], state["result"])
+                state["status"], state["result"], state["log"])
     return obj
 
   @utils.LockedMethod
@@ -67,6 +72,7 @@ class _QueuedOpCode(object):
       "input": self.input.__getstate__(),
       "status": self.status,
       "result": self.result,
+      "log": self.log,
       }
 
   @utils.LockedMethod
@@ -98,6 +104,27 @@ class _QueuedOpCode(object):
     """
     return self.result
 
+  @utils.LockedMethod
+  def Log(self, *args):
+    """Append a log entry.
+
+    """
+    assert len(args) < 2
+
+    if len(args) == 1:
+      log_type = constants.ELOG_MESSAGE
+      log_msg = args[0]
+    else:
+      log_type, log_msg = args
+    self.log.append((time.time(), log_type, log_msg))
+
+  @utils.LockedMethod
+  def RetrieveLog(self, start_at=0):
+    """Retrieve (a part of) the execution log.
+
+    """
+    return self.log[start_at:]
+
 
 class _QueuedJob(object):
   """In-memory job representation.
@@ -110,24 +137,27 @@ class _QueuedJob(object):
       # TODO
       raise Exception("No opcodes")
 
-    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
+    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
 
-  def __Setup(self, storage, job_id, ops):
+  def __Setup(self, storage, job_id, ops, run_op_index):
+    self._lock = threading.Lock()
     self.storage = storage
     self.id = job_id
     self._ops = ops
+    self.run_op_index = run_op_index
 
   @classmethod
   def Restore(cls, storage, state):
     obj = object.__new__(cls)
-    obj.__Setup(storage, state["id"],
-                [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
+    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
+    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
     return obj
 
   def Serialize(self):
     return {
       "id": self.id,
       "ops": [op.Serialize() for op in self._ops],
+      "run_op_index": self.run_op_index,
       }
 
   def SetUnclean(self, msg):
@@ -162,6 +192,10 @@ class _QueuedJob(object):
 
     return status
 
+  @utils.LockedMethod
+  def GetRunOpIndex(self):
+    return self.run_op_index
+
   def Run(self, proc):
     """Job executor.
 
@@ -177,10 +211,17 @@ class _QueuedJob(object):
       for idx, op in enumerate(self._ops):
         try:
           logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
+
+          self._lock.acquire()
+          try:
+            self.run_op_index = idx
+          finally:
+            self._lock.release()
+
           op.SetStatus(constants.OP_STATUS_RUNNING, None)
           self.storage.UpdateJob(self)
 
-          result = proc.ExecOpCode(op.input)
+          result = proc.ExecOpCode(op.input, op.Log)
 
           op.SetStatus(constants.OP_STATUS_SUCCESS, result)
           self.storage.UpdateJob(self)
@@ -207,7 +248,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
     logging.debug("Worker %s processing job %s",
                   self.worker_id, job.id)
     # TODO: feedback function
-    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
+    proc = mcpu.Processor(self.pool.context)
     try:
       job.Run(proc)
     finally:
@@ -477,6 +518,18 @@ class JobQueue:
         row.append([op.GetResult() for op in job._ops])
       elif fname == "opstatus":
         row.append([op.GetStatus() for op in job._ops])
+      elif fname == "ticker":
+        ji = job.GetRunOpIndex()
+        if ji < 0:
+          lmsg = None
+        else:
+          lmsg = job._ops[ji].RetrieveLog(-1)
+          # message might be empty here
+          if lmsg:
+            lmsg = lmsg[0]
+          else:
+            lmsg = None
+        row.append(lmsg)
       else:
         raise errors.OpExecError("Invalid job query field '%s'" % fname)
     return row
index 4bdfb46..f9e1d84 100644 (file)
@@ -90,7 +90,7 @@ class Processor(object):
     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
     }
 
-  def __init__(self, context, feedback=None):
+  def __init__(self, context):
     """Constructor for Processor
 
     Args:
@@ -98,7 +98,7 @@ class Processor(object):
                     interesting events are happening
     """
     self.context = context
-    self._feedback_fn = feedback
+    self._feedback_fn = None
     self.exclusive_BGL = False
 
   def _ExecLU(self, lu):
@@ -146,7 +146,7 @@ class Processor(object):
 
     return result
 
-  def ExecOpCode(self, op):
+  def ExecOpCode(self, op, feedback_fn):
     """Execute an opcode.
 
     Args:
@@ -157,6 +157,7 @@ class Processor(object):
       raise errors.ProgrammerError("Non-opcode instance passed"
                                    " to ExecOpcode")
 
+    self._feedback_fn = feedback_fn
     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
     if lu_class is None:
       raise errors.OpCodeUnknown("Unknown opcode")