X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/4d5fe81be33a5f75e379072fa65ee6ff512653c5..69b999879d380c6c8ea2e8871085ffe46a13779b:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index ebfb968..91a721a 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -145,20 +145,18 @@ class _QueuedJob(object): @ivar id: the job ID @type ops: list @ivar ops: the list of _QueuedOpCode that constitute the job - @type run_op_index: int - @ivar run_op_index: the currently executing opcode, or -1 if - we didn't yet start executing @type log_serial: int @ivar log_serial: holds the index for the next log entry @ivar received_timestamp: the timestamp for when the job was received @ivar start_timestmap: the timestamp for start of execution @ivar end_timestamp: the timestamp for end of execution + @ivar lock_status: In-memory locking information for debugging @ivar change: a Condition variable we use for waiting for job changes """ - __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial", + __slots__ = ["queue", "id", "ops", "log_serial", "received_timestamp", "start_timestamp", "end_timestamp", - "change", + "lock_status", "change", "__weakref__"] def __init__(self, queue, job_id, ops): @@ -180,12 +178,14 @@ class _QueuedJob(object): self.queue = queue self.id = job_id self.ops = [_QueuedOpCode(op) for op in ops] - self.run_op_index = -1 self.log_serial = 0 self.received_timestamp = TimeStampNow() self.start_timestamp = None self.end_timestamp = None + # In-memory attributes + self.lock_status = None + # Condition to wait for changes self.change = threading.Condition(self.queue._lock) @@ -204,11 +204,13 @@ class _QueuedJob(object): obj = _QueuedJob.__new__(cls) obj.queue = queue obj.id = state["id"] - obj.run_op_index = state["run_op_index"] obj.received_timestamp = state.get("received_timestamp", None) obj.start_timestamp = state.get("start_timestamp", None) obj.end_timestamp = state.get("end_timestamp", None) + # In-memory attributes + obj.lock_status = None + obj.ops = [] obj.log_serial = 0 for op_state in state["ops"]: @@ -232,7 +234,6 @@ class _QueuedJob(object): return { "id": self.id, "ops": [op.Serialize() for op in self.ops], - "run_op_index": self.run_op_index, "start_timestamp": self.start_timestamp, "end_timestamp": self.end_timestamp, "received_timestamp": self.received_timestamp, @@ -334,35 +335,90 @@ class _QueuedJob(object): not_marked = False -class _JobQueueWorker(workerpool.BaseWorker): - """The actual job workers. +class _OpExecCallbacks(mcpu.OpExecCbBase): + def __init__(self, queue, job, op): + """Initializes this class. - """ - def _NotifyStart(self): + @type queue: L{JobQueue} + @param queue: Job queue + @type job: L{_QueuedJob} + @param job: Job object + @type op: L{_QueuedOpCode} + @param op: OpCode + + """ + assert queue, "Queue is missing" + assert job, "Job is missing" + assert op, "Opcode is missing" + + self._queue = queue + self._job = job + self._op = op + + def NotifyStart(self): """Mark the opcode as running, not lock-waiting. - This is called from the mcpu code as a notifier function, when the - LU is finally about to start the Exec() method. Of course, to have - end-user visible results, the opcode must be initially (before - calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. + This is called from the mcpu code as a notifier function, when the LU is + finally about to start the Exec() method. Of course, to have end-user + visible results, the opcode must be initially (before calling into + Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. """ - assert self.queue, "Queue attribute is missing" - assert self.opcode, "Opcode attribute is missing" - - self.queue.acquire() + self._queue.acquire() try: - assert self.opcode.status in (constants.OP_STATUS_WAITLOCK, - constants.OP_STATUS_CANCELING) + assert self._op.status in (constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING) + + # All locks are acquired by now + self._job.lock_status = None # Cancel here if we were asked to - if self.opcode.status == constants.OP_STATUS_CANCELING: + if self._op.status == constants.OP_STATUS_CANCELING: raise CancelJob() - self.opcode.status = constants.OP_STATUS_RUNNING + self._op.status = constants.OP_STATUS_RUNNING + finally: + self._queue.release() + + def Feedback(self, *args): + """Append a log entry. + + """ + assert len(args) < 3 + + if len(args) == 1: + log_type = constants.ELOG_MESSAGE + log_msg = args[0] + else: + (log_type, log_msg) = args + + # The time is split to make serialization easier and not lose + # precision. + timestamp = utils.SplitTime(time.time()) + + self._queue.acquire() + try: + self._job.log_serial += 1 + self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) + + self._job.change.notifyAll() finally: - self.queue.release() + self._queue.release() + + def ReportLocks(self, msg): + """Write locking information to the job. + + Called whenever the LU processor is waiting for a lock or has acquired one. + + """ + # Not getting the queue lock because this is a single assignment + self._job.lock_status = msg + +class _JobQueueWorker(workerpool.BaseWorker): + """The actual job workers. + + """ def RunTask(self, job): """Job executor. @@ -376,7 +432,7 @@ class _JobQueueWorker(workerpool.BaseWorker): logging.info("Worker %s processing job %s", self.worker_id, job.id) proc = mcpu.Processor(self.pool.queue.context) - self.queue = queue = job.queue + queue = job.queue try: try: count = len(job.ops) @@ -400,7 +456,6 @@ class _JobQueueWorker(workerpool.BaseWorker): if op.status == constants.OP_STATUS_CANCELED: raise CancelJob() assert op.status == constants.OP_STATUS_QUEUED - job.run_op_index = idx op.status = constants.OP_STATUS_WAITLOCK op.result = None op.start_timestamp = TimeStampNow() @@ -412,34 +467,9 @@ class _JobQueueWorker(workerpool.BaseWorker): finally: queue.release() - def _Log(*args): - """Append a log entry. - - """ - assert len(args) < 3 - - if len(args) == 1: - log_type = constants.ELOG_MESSAGE - log_msg = args[0] - else: - log_type, log_msg = args - - # The time is split to make serialization easier and not lose - # precision. - timestamp = utils.SplitTime(time.time()) - - queue.acquire() - try: - job.log_serial += 1 - op.log.append((job.log_serial, timestamp, log_type, log_msg)) - - job.change.notifyAll() - finally: - queue.release() - - # Make sure not to hold lock while _Log is called - self.opcode = op - result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart) + # Make sure not to hold queue lock while calling ExecOpCode + result = proc.ExecOpCode(input_opcode, + _OpExecCallbacks(queue, job, op)) queue.acquire() try: @@ -487,7 +517,7 @@ class _JobQueueWorker(workerpool.BaseWorker): queue.acquire() try: try: - job.run_op_index = -1 + job.lock_status = None job.end_timestamp = TimeStampNow() queue.UpdateJobUnlocked(job) finally: @@ -495,6 +525,7 @@ class _JobQueueWorker(workerpool.BaseWorker): status = job.CalcStatus() finally: queue.release() + logging.info("Worker %s finished job %s, status = %s", self.worker_id, job_id, status) @@ -642,7 +673,7 @@ class JobQueue(object): # Clean queue directory on added node result = rpc.RpcRunner.call_jobqueue_purge(node_name) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: logging.warning("Cannot cleanup queue directory on node %s: %s", node_name, msg) @@ -666,7 +697,7 @@ class JobQueue(object): result = rpc.RpcRunner.call_jobqueue_update([node_name], [node.primary_ip], file_name, content) - msg = result[node_name].RemoteFailMsg() + msg = result[node_name].fail_msg if msg: logging.error("Failed to upload file %s to node %s: %s", file_name, node_name, msg) @@ -706,7 +737,7 @@ class JobQueue(object): success = [] for node in nodes: - msg = result[node].RemoteFailMsg() + msg = result[node].fail_msg if msg: failed.append(node) logging.error("RPC call %s failed on node %s: %s", @@ -996,7 +1027,7 @@ class JobQueue(object): queue, in order for it to be picked up by the queue processors. @type job_id: job ID - @param jod_id: the job ID for the new job + @param job_id: the job ID for the new job @type ops: list @param ops: The list of OpCodes that will become the new job. @rtype: job ID @@ -1063,7 +1094,6 @@ class JobQueue(object): return results - @_RequireOpenQueue def UpdateJobUnlocked(self, job): """Update a job's on disk storage. @@ -1357,6 +1387,8 @@ class JobQueue(object): row.append(job.start_timestamp) elif fname == "end_ts": row.append(job.end_timestamp) + elif fname == "lock_status": + row.append(job.lock_status) elif fname == "summary": row.append([op.input.Summary() for op in job.ops]) else: