import threading
import errno
import re
+import time
from ganeti import constants
from ganeti import serializer
Access is synchronized by the '_lock' attribute.
+ The 'log' attribute holds the execution log and consists of tuples
+ of the form (timestamp, level, message).
+
"""
def __init__(self, op):
- self.__Setup(op, constants.OP_STATUS_QUEUED, None)
+ self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
- def __Setup(self, input, status, result):
+ def __Setup(self, input_, status, result, log):
self._lock = threading.Lock()
- self.input = input
+ self.input = input_
self.status = status
self.result = result
+ self.log = log
@classmethod
def Restore(cls, state):
obj = object.__new__(cls)
obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
- state["status"], state["result"])
+ state["status"], state["result"], state["log"])
return obj
@utils.LockedMethod
"input": self.input.__getstate__(),
"status": self.status,
"result": self.result,
+ "log": self.log,
}
@utils.LockedMethod
"""
return self.result
+ @utils.LockedMethod
+ def Log(self, *args):
+ """Append a log entry.
+
+ """
+ assert len(args) < 2
+
+ if len(args) == 1:
+ log_type = constants.ELOG_MESSAGE
+ log_msg = args[0]
+ else:
+ log_type, log_msg = args
+ self.log.append((time.time(), log_type, log_msg))
+
+ @utils.LockedMethod
+ def RetrieveLog(self, start_at=0):
+ """Retrieve (a part of) the execution log.
+
+ """
+ return self.log[start_at:]
+
class _QueuedJob(object):
"""In-memory job representation.
# TODO
raise Exception("No opcodes")
- self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
+ self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
- def __Setup(self, storage, job_id, ops):
+ def __Setup(self, storage, job_id, ops, run_op_index):
+ self._lock = threading.Lock()
self.storage = storage
self.id = job_id
self._ops = ops
+ self.run_op_index = run_op_index
@classmethod
def Restore(cls, storage, state):
obj = object.__new__(cls)
- obj.__Setup(storage, state["id"],
- [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
+ op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
+ obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
return obj
def Serialize(self):
return {
"id": self.id,
"ops": [op.Serialize() for op in self._ops],
+ "run_op_index": self.run_op_index,
}
def SetUnclean(self, msg):
return status
+ @utils.LockedMethod
+ def GetRunOpIndex(self):
+ return self.run_op_index
+
def Run(self, proc):
"""Job executor.
for idx, op in enumerate(self._ops):
try:
logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
+
+ self._lock.acquire()
+ try:
+ self.run_op_index = idx
+ finally:
+ self._lock.release()
+
op.SetStatus(constants.OP_STATUS_RUNNING, None)
self.storage.UpdateJob(self)
- result = proc.ExecOpCode(op.input)
+ result = proc.ExecOpCode(op.input, op.Log)
op.SetStatus(constants.OP_STATUS_SUCCESS, result)
self.storage.UpdateJob(self)
logging.debug("Worker %s processing job %s",
self.worker_id, job.id)
# TODO: feedback function
- proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
+ proc = mcpu.Processor(self.pool.context)
try:
job.Run(proc)
finally:
row.append([op.GetResult() for op in job._ops])
elif fname == "opstatus":
row.append([op.GetStatus() for op in job._ops])
+ elif fname == "ticker":
+ ji = job.GetRunOpIndex()
+ if ji < 0:
+ lmsg = None
+ else:
+ lmsg = job._ops[ji].RetrieveLog(-1)
+ # message might be empty here
+ if lmsg:
+ lmsg = lmsg[0]
+ else:
+ lmsg = None
+ row.append(lmsg)
else:
raise errors.OpExecError("Invalid job query field '%s'" % fname)
return row
opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
}
- def __init__(self, context, feedback=None):
+ def __init__(self, context):
"""Constructor for Processor
Args:
interesting events are happening
"""
self.context = context
- self._feedback_fn = feedback
+ self._feedback_fn = None
self.exclusive_BGL = False
def _ExecLU(self, lu):
return result
- def ExecOpCode(self, op):
+ def ExecOpCode(self, op, feedback_fn):
"""Execute an opcode.
Args:
raise errors.ProgrammerError("Non-opcode instance passed"
" to ExecOpcode")
+ self._feedback_fn = feedback_fn
lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
if lu_class is None:
raise errors.OpCodeUnknown("Unknown opcode")