"""
proc = mcpu.Processor(self.server.context)
# TODO: Where should log messages go?
- return proc.ExecOpCode(op, self._DummyLog)
+ return proc.ExecOpCode(op, self._DummyLog, None)
class GanetiContext(object):
# Job status
JOB_STATUS_QUEUED = "queued"
+JOB_STATUS_WAITLOCK = "waiting"
JOB_STATUS_RUNNING = "running"
JOB_STATUS_CANCELED = "canceled"
JOB_STATUS_SUCCESS = "success"
JOB_STATUS_ERROR = "error"
OP_STATUS_QUEUED = "queued"
+OP_STATUS_WAITLOCK = "waiting"
OP_STATUS_RUNNING = "running"
OP_STATUS_CANCELED = "canceled"
OP_STATUS_SUCCESS = "success"
if op.status == constants.OP_STATUS_QUEUED:
pass
+ elif op.status == constants.OP_STATUS_WAITLOCK:
+ status = constants.JOB_STATUS_WAITLOCK
elif op.status == constants.OP_STATUS_RUNNING:
status = constants.JOB_STATUS_RUNNING
elif op.status == constants.OP_STATUS_ERROR:
class _JobQueueWorker(workerpool.BaseWorker):
+ def _NotifyStart(self):
+ """Mark the opcode as running, not lock-waiting.
+
+ This is called from the mcpu code as a notifier function, when the
+ LU is finally about to start the Exec() method. Of course, to have
+ end-user visible results, the opcode must be initially (before
+ calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
+
+ """
+ assert self.queue, "Queue attribute is missing"
+ assert self.opcode, "Opcode attribute is missing"
+
+ self.queue.acquire()
+ try:
+ self.opcode.status = constants.OP_STATUS_RUNNING
+ finally:
+ self.queue.release()
+
def RunTask(self, job):
"""Job executor.
logging.debug("Worker %s processing job %s",
self.worker_id, job.id)
proc = mcpu.Processor(self.pool.queue.context)
- queue = job.queue
+ self.queue = queue = job.queue
try:
try:
count = len(job.ops)
queue.acquire()
try:
job.run_op_index = idx
- op.status = constants.OP_STATUS_RUNNING
+ op.status = constants.OP_STATUS_WAITLOCK
op.result = None
op.start_timestamp = TimeStampNow()
if idx == 0: # first opcode
queue.release()
# Make sure not to hold lock while _Log is called
- result = proc.ExecOpCode(input_opcode, _Log)
+ self.opcode = op
+ result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
queue.acquire()
try:
if status in (constants.JOB_STATUS_QUEUED, ):
self._wpool.AddTask(job)
- elif status in (constants.JOB_STATUS_RUNNING, ):
+ elif status in (constants.JOB_STATUS_RUNNING,
+ constants.JOB_STATUS_WAITLOCK):
logging.warning("Unfinished job %s found: %s", job.id, job)
try:
for op in job.ops:
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
if status not in (constants.JOB_STATUS_QUEUED,
- constants.JOB_STATUS_RUNNING):
+ constants.JOB_STATUS_RUNNING,
+ constants.JOB_STATUS_WAITLOCK):
# Don't even try to wait if the job is no longer running, there will be
# no changes.
break
adding_locks = level in lu.add_locks
acquiring_locks = level in lu.needed_locks
if level not in locking.LEVELS:
+ if callable(self._run_notifier):
+ self._run_notifier()
result = self._ExecLU(lu)
elif adding_locks and acquiring_locks:
# We could both acquire and add locks at the same level, but for now we
return result
- def ExecOpCode(self, op, feedback_fn):
+ def ExecOpCode(self, op, feedback_fn, run_notifier):
"""Execute an opcode.
- Args:
- op: the opcode to be executed
+ @type op: an OpCode instance
+ @param op: the opcode to be executed
+ @type feedback_fn: a function that takes a single argument
+ @param feedback_fn: this function will be used as feedback from the LU
+ code to the end-user
+ @type run_notifier: callable (no arguments) or None
+ @param run_notifier: this function (if callable) will be called when
+ we are about to call the lu's Exec() method, that
+ is, after we have aquired all locks
"""
if not isinstance(op, opcodes.OpCode):
" to ExecOpcode")
self._feedback_fn = feedback_fn
+ self._run_notifier = run_notifier
lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
if lu_class is None:
raise errors.OpCodeUnknown("Unknown opcode")
_USER_JOB_STATUS = {
constants.JOB_STATUS_QUEUED: "queued",
+ constants.JOB_STATUS_WAITLOCK: "waiting",
constants.JOB_STATUS_RUNNING: "running",
constants.JOB_STATUS_CANCELED: "canceled",
constants.JOB_STATUS_SUCCESS: "success",