From f1048938a0e48c0c81818b928e2e32287a3f871f Mon Sep 17 00:00:00 2001 From: Iustin Pop Date: Mon, 14 Jul 2008 13:15:58 +0000 Subject: [PATCH] First version of user feedback fixes This patch contains a raw version for fixing feedback_fn. The new mechanism works as follows: - instead of a per-Processor feedback_fn, there's one for each ExecOpCode, so that feedback for different opcodes go via possibly different functions - each _QueuedOpCode gets a message buffer, a method for adding feedback and a method for retrieving (parts of) the feedback - the _QueuedJob object gets a new attribute that is equal to the index of the currently executing opcode - job queries get an extra parameter called 'ticker' that will return the latest message on the current executing opcode - the cli.py job completion poll will show the new status if different from the old one Of course, quick messages will be lost, as currently only the latest one is available. Also changes between opcodes are not represented at all. Reviewed-by: imsnah --- lib/cli.py | 7 +++++- lib/constants.py | 4 +++ lib/jqueue.py | 73 ++++++++++++++++++++++++++++++++++++++++++++++-------- lib/mcpu.py | 7 +++--- 4 files changed, 77 insertions(+), 14 deletions(-) diff --git a/lib/cli.py b/lib/cli.py index 1f89be3..34a2bfb 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -382,8 +382,9 @@ def SubmitOpCode(op, proc=None, feedback_fn=None): job_id = cl.SubmitJob([op]) + lastmsg = None while True: - jobs = cl.QueryJobs([job_id], ["status"]) + jobs = cl.QueryJobs([job_id], ["status", "ticker"]) if not jobs: # job not found, go away! raise errors.JobLost("Job with id %s lost" % job_id) @@ -392,6 +393,10 @@ def SubmitOpCode(op, proc=None, feedback_fn=None): status = jobs[0][0] if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR): break + msg = jobs[0][1] + if msg is not None and msg != lastmsg: + print "%s %s" % (time.ctime(msg[0]), msg[2]) + lastmsg = msg time.sleep(1) jobs = cl.QueryJobs([job_id], ["status", "opresult"]) diff --git a/lib/constants.py b/lib/constants.py index d3a8c10..6b00e6b 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -262,3 +262,7 @@ OP_STATUS_QUEUED = "queued" OP_STATUS_RUNNING = "running" OP_STATUS_SUCCESS = "success" OP_STATUS_ERROR = "error" + +# Execution log types +ELOG_MESSAGE = "message" +ELOG_PROGRESS = "progress" diff --git a/lib/jqueue.py b/lib/jqueue.py index 0b86f33..950992e 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -26,6 +26,7 @@ import logging import threading import errno import re +import time from ganeti import constants from ganeti import serializer @@ -44,21 +45,25 @@ class _QueuedOpCode(object): Access is synchronized by the '_lock' attribute. + The 'log' attribute holds the execution log and consists of tuples + of the form (timestamp, level, message). + """ def __init__(self, op): - self.__Setup(op, constants.OP_STATUS_QUEUED, None) + self.__Setup(op, constants.OP_STATUS_QUEUED, None, []) - def __Setup(self, input, status, result): + def __Setup(self, input_, status, result, log): self._lock = threading.Lock() - self.input = input + self.input = input_ self.status = status self.result = result + self.log = log @classmethod def Restore(cls, state): obj = object.__new__(cls) obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]), - state["status"], state["result"]) + state["status"], state["result"], state["log"]) return obj @utils.LockedMethod @@ -67,6 +72,7 @@ class _QueuedOpCode(object): "input": self.input.__getstate__(), "status": self.status, "result": self.result, + "log": self.log, } @utils.LockedMethod @@ -98,6 +104,27 @@ class _QueuedOpCode(object): """ return self.result + @utils.LockedMethod + def Log(self, *args): + """Append a log entry. + + """ + assert len(args) < 2 + + if len(args) == 1: + log_type = constants.ELOG_MESSAGE + log_msg = args[0] + else: + log_type, log_msg = args + self.log.append((time.time(), log_type, log_msg)) + + @utils.LockedMethod + def RetrieveLog(self, start_at=0): + """Retrieve (a part of) the execution log. + + """ + return self.log[start_at:] + class _QueuedJob(object): """In-memory job representation. @@ -110,24 +137,27 @@ class _QueuedJob(object): # TODO raise Exception("No opcodes") - self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops]) + self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1) - def __Setup(self, storage, job_id, ops): + def __Setup(self, storage, job_id, ops, run_op_index): + self._lock = threading.Lock() self.storage = storage self.id = job_id self._ops = ops + self.run_op_index = run_op_index @classmethod def Restore(cls, storage, state): obj = object.__new__(cls) - obj.__Setup(storage, state["id"], - [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]) + op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]] + obj.__Setup(storage, state["id"], op_list, state["run_op_index"]) return obj def Serialize(self): return { "id": self.id, "ops": [op.Serialize() for op in self._ops], + "run_op_index": self.run_op_index, } def SetUnclean(self, msg): @@ -162,6 +192,10 @@ class _QueuedJob(object): return status + @utils.LockedMethod + def GetRunOpIndex(self): + return self.run_op_index + def Run(self, proc): """Job executor. @@ -177,10 +211,17 @@ class _QueuedJob(object): for idx, op in enumerate(self._ops): try: logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) + + self._lock.acquire() + try: + self.run_op_index = idx + finally: + self._lock.release() + op.SetStatus(constants.OP_STATUS_RUNNING, None) self.storage.UpdateJob(self) - result = proc.ExecOpCode(op.input) + result = proc.ExecOpCode(op.input, op.Log) op.SetStatus(constants.OP_STATUS_SUCCESS, result) self.storage.UpdateJob(self) @@ -207,7 +248,7 @@ class _JobQueueWorker(workerpool.BaseWorker): logging.debug("Worker %s processing job %s", self.worker_id, job.id) # TODO: feedback function - proc = mcpu.Processor(self.pool.context, feedback=lambda x: None) + proc = mcpu.Processor(self.pool.context) try: job.Run(proc) finally: @@ -477,6 +518,18 @@ class JobQueue: row.append([op.GetResult() for op in job._ops]) elif fname == "opstatus": row.append([op.GetStatus() for op in job._ops]) + elif fname == "ticker": + ji = job.GetRunOpIndex() + if ji < 0: + lmsg = None + else: + lmsg = job._ops[ji].RetrieveLog(-1) + # message might be empty here + if lmsg: + lmsg = lmsg[0] + else: + lmsg = None + row.append(lmsg) else: raise errors.OpExecError("Invalid job query field '%s'" % fname) return row diff --git a/lib/mcpu.py b/lib/mcpu.py index 4bdfb46..f9e1d84 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -90,7 +90,7 @@ class Processor(object): opcodes.OpTestAllocator: cmdlib.LUTestAllocator, } - def __init__(self, context, feedback=None): + def __init__(self, context): """Constructor for Processor Args: @@ -98,7 +98,7 @@ class Processor(object): interesting events are happening """ self.context = context - self._feedback_fn = feedback + self._feedback_fn = None self.exclusive_BGL = False def _ExecLU(self, lu): @@ -146,7 +146,7 @@ class Processor(object): return result - def ExecOpCode(self, op): + def ExecOpCode(self, op, feedback_fn): """Execute an opcode. Args: @@ -157,6 +157,7 @@ class Processor(object): raise errors.ProgrammerError("Non-opcode instance passed" " to ExecOpcode") + self._feedback_fn = feedback_fn lu_class = self.DISPATCH_TABLE.get(op.__class__, None) if lu_class is None: raise errors.OpCodeUnknown("Unknown opcode") -- 1.7.10.4