X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/b80cc51813254199e0b30e2bbe354374110cbd17..dd7f6776235601c60ce4257c710a2bc0be80a206:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index c7fd4c7..56f7a66 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -29,7 +29,6 @@ used by all other classes in this module. """ -import os import logging import errno import re @@ -176,7 +175,7 @@ class _QueuedJob(object): """ # pylint: disable-msg=W0212 - __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", + __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", "received_timestamp", "start_timestamp", "end_timestamp", "__weakref__"] @@ -211,6 +210,7 @@ class _QueuedJob(object): """ obj.ops_iter = None + obj.cur_opctx = None def __repr__(self): status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), @@ -375,6 +375,8 @@ class _QueuedJob(object): row.append(self.id) elif fname == "status": row.append(self.CalcStatus()) + elif fname == "priority": + row.append(self.CalcPriority()) elif fname == "ops": row.append([op.input.__getstate__() for op in self.ops]) elif fname == "opresult": @@ -389,6 +391,8 @@ class _QueuedJob(object): row.append([op.exec_timestamp for op in self.ops]) elif fname == "opend": row.append([op.end_timestamp for op in self.ops]) + elif fname == "oppriority": + row.append([op.priority for op in self.ops]) elif fname == "received_ts": row.append(self.received_timestamp) elif fname == "start_ts": @@ -431,22 +435,19 @@ class _QueuedJob(object): """ status = self.CalcStatus() - if status not in (constants.JOB_STATUS_QUEUED, - constants.JOB_STATUS_WAITLOCK): - logging.debug("Job %s is no longer waiting in the queue", self.id) - return (False, "Job %s is no longer waiting in the queue" % self.id) - if status == constants.JOB_STATUS_QUEUED: self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, "Job canceled by request") - msg = "Job %s canceled" % self.id + return (True, "Job %s canceled" % self.id) elif status == constants.JOB_STATUS_WAITLOCK: # The worker will notice the new status and cancel the job self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) - msg = "Job %s will be canceled" % self.id + return (True, "Job %s will be canceled" % self.id) - return (True, msg) + else: + logging.debug("Job %s is no longer waiting in the queue", self.id) + return (False, "Job %s is no longer waiting in the queue" % self.id) class _OpExecCallbacks(mcpu.OpExecCbBase): @@ -745,8 +746,40 @@ def _EncodeOpError(err): return errors.EncodeException(to_encode) +class _TimeoutStrategyWrapper: + def __init__(self, fn): + """Initializes this class. + + """ + self._fn = fn + self._next = None + + def _Advance(self): + """Gets the next timeout if necessary. + + """ + if self._next is None: + self._next = self._fn() + + def Peek(self): + """Returns the next timeout. + + """ + self._Advance() + return self._next + + def Next(self): + """Returns the current timeout and advances the internal state. + + """ + self._Advance() + result = self._next + self._next = None + return result + + class _OpExecContext: - def __init__(self, op, index, log_prefix): + def __init__(self, op, index, log_prefix, timeout_strategy_factory): """Initializes this class. """ @@ -755,22 +788,60 @@ class _OpExecContext: self.log_prefix = log_prefix self.summary = op.input.Summary() + self._timeout_strategy_factory = timeout_strategy_factory + self._ResetTimeoutStrategy() + + def _ResetTimeoutStrategy(self): + """Creates a new timeout strategy. + + """ + self._timeout_strategy = \ + _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt) + + def CheckPriorityIncrease(self): + """Checks whether priority can and should be increased. + + Called when locks couldn't be acquired. + + """ + op = self.op + + # Exhausted all retries and next round should not use blocking acquire + # for locks? + if (self._timeout_strategy.Peek() is None and + op.priority > constants.OP_PRIO_HIGHEST): + logging.debug("Increasing priority") + op.priority -= 1 + self._ResetTimeoutStrategy() + return True + + return False + + def GetNextLockTimeout(self): + """Returns the next lock acquire timeout. + + """ + return self._timeout_strategy.Next() + class _JobProcessor(object): - def __init__(self, queue, opexec_fn, job): + def __init__(self, queue, opexec_fn, job, + _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy): """Initializes this class. """ self.queue = queue self.opexec_fn = opexec_fn self.job = job + self._timeout_strategy_factory = _timeout_strategy_factory @staticmethod - def _FindNextOpcode(job): + def _FindNextOpcode(job, timeout_strategy_factory): """Locates the next opcode to run. @type job: L{_QueuedJob} @param job: Job object + @param timeout_strategy_factory: Callable to create new timeout strategy """ # Create some sort of a cache to speed up locating next opcode for future @@ -791,7 +862,8 @@ class _JobProcessor(object): # Found an opcode already marked as running raise errors.ProgrammerError("Called for job marked as running") - opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops))) + opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), + timeout_strategy_factory) if op.status == constants.OP_STATUS_CANCELED: # Cancelled jobs are handled by the caller @@ -817,18 +889,33 @@ class _JobProcessor(object): @type job: L{_QueuedJob} @param job: Job object - @type job: L{_QueuedOpCode} - @param job: Opcode object + @type op: L{_QueuedOpCode} + @param op: Opcode object """ assert op in job.ops + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITLOCK) + + update = False - op.status = constants.OP_STATUS_WAITLOCK op.result = None - op.start_timestamp = TimeStampNow() + + if op.status == constants.OP_STATUS_QUEUED: + op.status = constants.OP_STATUS_WAITLOCK + update = True + + if op.start_timestamp is None: + op.start_timestamp = TimeStampNow() + update = True if job.start_timestamp is None: job.start_timestamp = op.start_timestamp + update = True + + assert op.status == constants.OP_STATUS_WAITLOCK + + return update def _ExecOpCodeUnlocked(self, opctx): """Processes one opcode and returns the result. @@ -838,10 +925,26 @@ class _JobProcessor(object): assert op.status == constants.OP_STATUS_WAITLOCK + timeout = opctx.GetNextLockTimeout() + try: # Make sure not to hold queue lock while calling ExecOpCode result = self.opexec_fn(op.input, - _OpExecCallbacks(self.queue, self.job, op)) + _OpExecCallbacks(self.queue, self.job, op), + timeout=timeout, priority=op.priority) + except mcpu.LockAcquireTimeout: + assert timeout is not None, "Received timeout for blocking acquire" + logging.debug("Couldn't acquire locks in %0.6fs", timeout) + + assert op.status in (constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING) + + # Was job cancelled while we were waiting for the lock? + if op.status == constants.OP_STATUS_CANCELING: + return (constants.OP_STATUS_CANCELING, None) + + # Stay in waitlock while trying to re-acquire lock + return (constants.OP_STATUS_WAITLOCK, None) except CancelJob: logging.exception("%s: Canceling job", opctx.log_prefix) assert op.status == constants.OP_STATUS_CANCELING @@ -855,9 +958,10 @@ class _JobProcessor(object): opctx.log_prefix, opctx.summary) return (constants.OP_STATUS_SUCCESS, result) - def __call__(self): + def __call__(self, _nextop_fn=None): """Continues execution of a job. + @param _nextop_fn: Callback function for tests @rtype: bool @return: True if job is finished, False if processor needs to be called again @@ -872,27 +976,44 @@ class _JobProcessor(object): try: opcount = len(job.ops) - opctx = self._FindNextOpcode(job) + # Is a previous opcode still pending? + if job.cur_opctx: + opctx = job.cur_opctx + job.cur_opctx = None + else: + if __debug__ and _nextop_fn: + _nextop_fn() + opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) + op = opctx.op # Consistency check assert compat.all(i.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_CANCELING, constants.OP_STATUS_CANCELED) - for i in job.ops[opctx.index:]) + for i in job.ops[opctx.index + 1:]) assert op.status in (constants.OP_STATUS_QUEUED, constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING, constants.OP_STATUS_CANCELED) - if op.status != constants.OP_STATUS_CANCELED: + assert (op.priority <= constants.OP_PRIO_LOWEST and + op.priority >= constants.OP_PRIO_HIGHEST) + + if op.status not in (constants.OP_STATUS_CANCELING, + constants.OP_STATUS_CANCELED): + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITLOCK) + # Prepare to start opcode - self._MarkWaitlock(job, op) + if self._MarkWaitlock(job, op): + # Write to disk + queue.UpdateJobUnlocked(job) assert op.status == constants.OP_STATUS_WAITLOCK assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK - - # Write to disk - queue.UpdateJobUnlocked(job) + assert job.start_timestamp and op.start_timestamp logging.info("%s: opcode %s waiting for locks", opctx.log_prefix, opctx.summary) @@ -903,62 +1024,87 @@ class _JobProcessor(object): finally: queue.acquire(shared=1) - # Finalize opcode - op.end_timestamp = TimeStampNow() op.status = op_status op.result = op_result - if op.status == constants.OP_STATUS_CANCELING: - assert not compat.any(i.status != constants.OP_STATUS_CANCELING - for i in job.ops[opctx.index:]) + if op.status == constants.OP_STATUS_WAITLOCK: + # Couldn't get locks in time + assert not op.end_timestamp else: - assert op.status in constants.OPS_FINALIZED + # Finalize opcode + op.end_timestamp = TimeStampNow() - # Ensure all opcodes so far have been successful - assert (opctx.index == 0 or - compat.all(i.status == constants.OP_STATUS_SUCCESS - for i in job.ops[:opctx.index])) + if op.status == constants.OP_STATUS_CANCELING: + assert not compat.any(i.status != constants.OP_STATUS_CANCELING + for i in job.ops[opctx.index:]) + else: + assert op.status in constants.OPS_FINALIZED - if op.status == constants.OP_STATUS_SUCCESS: + if op.status == constants.OP_STATUS_WAITLOCK: finalize = False - elif op.status == constants.OP_STATUS_ERROR: - # Ensure failed opcode has an exception as its result - assert errors.GetEncodedError(job.ops[opctx.index].result) - - to_encode = errors.OpExecError("Preceding opcode failed") - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - _EncodeOpError(to_encode)) - finalize = True + if opctx.CheckPriorityIncrease(): + # Priority was changed, need to update on-disk file + queue.UpdateJobUnlocked(job) - # Consistency check - assert compat.all(i.status == constants.OP_STATUS_ERROR and - errors.GetEncodedError(i.result) - for i in job.ops[opctx.index:]) + # Keep around for another round + job.cur_opctx = opctx - elif op.status == constants.OP_STATUS_CANCELING: - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, - "Job canceled by request") - finalize = True + assert (op.priority <= constants.OP_PRIO_LOWEST and + op.priority >= constants.OP_PRIO_HIGHEST) - elif op.status == constants.OP_STATUS_CANCELED: - finalize = True + # In no case must the status be finalized here + assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK else: - raise errors.ProgrammerError("Unknown status '%s'" % op.status) + # Ensure all opcodes so far have been successful + assert (opctx.index == 0 or + compat.all(i.status == constants.OP_STATUS_SUCCESS + for i in job.ops[:opctx.index])) + + # Reset context + job.cur_opctx = None + + if op.status == constants.OP_STATUS_SUCCESS: + finalize = False - # Finalizing or last opcode? - if finalize or opctx.index == (opcount - 1): - # All opcodes have been run, finalize job - job.end_timestamp = TimeStampNow() + elif op.status == constants.OP_STATUS_ERROR: + # Ensure failed opcode has an exception as its result + assert errors.GetEncodedError(job.ops[opctx.index].result) - # Write to disk. If the job status is final, this is the final write - # allowed. Once the file has been written, it can be archived anytime. - queue.UpdateJobUnlocked(job) + to_encode = errors.OpExecError("Preceding opcode failed") + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + _EncodeOpError(to_encode)) + finalize = True - if finalize or opctx.index == (opcount - 1): - logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) - return True + # Consistency check + assert compat.all(i.status == constants.OP_STATUS_ERROR and + errors.GetEncodedError(i.result) + for i in job.ops[opctx.index:]) + + elif op.status == constants.OP_STATUS_CANCELING: + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") + finalize = True + + elif op.status == constants.OP_STATUS_CANCELED: + finalize = True + + else: + raise errors.ProgrammerError("Unknown status '%s'" % op.status) + + # Finalizing or last opcode? + if finalize or opctx.index == (opcount - 1): + # All opcodes have been run, finalize job + job.end_timestamp = TimeStampNow() + + # Write to disk. If the job status is final, this is the final write + # allowed. Once the file has been written, it can be archived anytime. + queue.UpdateJobUnlocked(job) + + if finalize or opctx.index == (opcount - 1): + logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) + return True return False finally: @@ -988,7 +1134,7 @@ class _JobQueueWorker(workerpool.BaseWorker): if not _JobProcessor(queue, proc.ExecOpCode, job)(): # Schedule again - raise workerpool.DeferTask() + raise workerpool.DeferTask(priority=job.CalcPriority()) class _JobQueueWorkerPool(workerpool.WorkerPool): @@ -1083,7 +1229,7 @@ class JobQueue(object): self._queue_size = 0 self._UpdateQueueSizeUnlocked() - self._drained = self._IsQueueMarkedDrain() + self._drained = jstore.CheckDrainFlag() # Setup worker pool self._wpool = _JobQueueWorkerPool(self) @@ -1125,15 +1271,22 @@ class JobQueue(object): status = job.CalcStatus() - if status in (constants.JOB_STATUS_QUEUED, ): + if status == constants.JOB_STATUS_QUEUED: restartjobs.append(job) elif status in (constants.JOB_STATUS_RUNNING, constants.JOB_STATUS_WAITLOCK, constants.JOB_STATUS_CANCELING): logging.warning("Unfinished job %s found: %s", job.id, job) - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - "Unclean master daemon shutdown") + + if status == constants.JOB_STATUS_WAITLOCK: + # Restart job + job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) + restartjobs.append(job) + else: + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") + self.UpdateJobUnlocked(job) if restartjobs: @@ -1475,19 +1628,6 @@ class JobQueue(object): logging.exception("Can't load/parse job %s", job_id) return None - @staticmethod - def _IsQueueMarkedDrain(): - """Check if the queue is marked from drain. - - This currently uses the queue drain file, which makes it a - per-node flag. In the future this can be moved to the config file. - - @rtype: boolean - @return: True of the job queue is marked for draining - - """ - return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) - def _UpdateQueueSizeUnlocked(self): """Update the queue size. @@ -1503,13 +1643,7 @@ class JobQueue(object): @param drain_flag: Whether to set or unset the drain flag """ - getents = runtime.GetEnts() - - if drain_flag: - utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True, - uid=getents.masterd_uid, gid=getents.masterd_gid) - else: - utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) + jstore.SetDrainFlag(drain_flag) self._drained = drain_flag