X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/42e320754feb026f1bff943c3599681db37089da..a3de2ae783ce4fac89ca914a88e35ed81a1cb722:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index d7cf580..ddf941e 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -176,7 +176,7 @@ class _QueuedJob(object): """ # pylint: disable-msg=W0212 - __slots__ = ["queue", "id", "ops", "log_serial", + __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", "received_timestamp", "start_timestamp", "end_timestamp", "__weakref__"] @@ -203,6 +203,16 @@ class _QueuedJob(object): self.start_timestamp = None self.end_timestamp = None + self._InitInMemory(self) + + @staticmethod + def _InitInMemory(obj): + """Initializes in-memory variables. + + """ + obj.ops_iter = None + obj.cur_opctx = None + def __repr__(self): status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), "id=%s" % self.id, @@ -237,6 +247,8 @@ class _QueuedJob(object): obj.log_serial = max(obj.log_serial, log_entry[0]) obj.ops.append(op) + cls._InitInMemory(obj) + return obj def Serialize(self): @@ -364,6 +376,8 @@ class _QueuedJob(object): row.append(self.id) elif fname == "status": row.append(self.CalcStatus()) + elif fname == "priority": + row.append(self.CalcPriority()) elif fname == "ops": row.append([op.input.__getstate__() for op in self.ops]) elif fname == "opresult": @@ -378,6 +392,8 @@ class _QueuedJob(object): row.append([op.exec_timestamp for op in self.ops]) elif fname == "opend": row.append([op.end_timestamp for op in self.ops]) + elif fname == "oppriority": + row.append([op.priority for op in self.ops]) elif fname == "received_ts": row.append(self.received_timestamp) elif fname == "start_ts": @@ -410,6 +426,30 @@ class _QueuedJob(object): op.result = result not_marked = False + def Cancel(self): + """Marks job as canceled/-ing if possible. + + @rtype: tuple; (bool, string) + @return: Boolean describing whether job was successfully canceled or marked + as canceling and a text message + + """ + status = self.CalcStatus() + + if status == constants.JOB_STATUS_QUEUED: + self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") + return (True, "Job %s canceled" % self.id) + + elif status == constants.JOB_STATUS_WAITLOCK: + # The worker will notice the new status and cancel the job + self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) + return (True, "Job %s will be canceled" % self.id) + + else: + logging.debug("Job %s is no longer waiting in the queue", self.id) + return (False, "Job %s is no longer waiting in the queue" % self.id) + class _OpExecCallbacks(mcpu.OpExecCbBase): def __init__(self, queue, job, op): @@ -707,6 +747,371 @@ def _EncodeOpError(err): return errors.EncodeException(to_encode) +class _TimeoutStrategyWrapper: + def __init__(self, fn): + """Initializes this class. + + """ + self._fn = fn + self._next = None + + def _Advance(self): + """Gets the next timeout if necessary. + + """ + if self._next is None: + self._next = self._fn() + + def Peek(self): + """Returns the next timeout. + + """ + self._Advance() + return self._next + + def Next(self): + """Returns the current timeout and advances the internal state. + + """ + self._Advance() + result = self._next + self._next = None + return result + + +class _OpExecContext: + def __init__(self, op, index, log_prefix, timeout_strategy_factory): + """Initializes this class. + + """ + self.op = op + self.index = index + self.log_prefix = log_prefix + self.summary = op.input.Summary() + + self._timeout_strategy_factory = timeout_strategy_factory + self._ResetTimeoutStrategy() + + def _ResetTimeoutStrategy(self): + """Creates a new timeout strategy. + + """ + self._timeout_strategy = \ + _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt) + + def CheckPriorityIncrease(self): + """Checks whether priority can and should be increased. + + Called when locks couldn't be acquired. + + """ + op = self.op + + # Exhausted all retries and next round should not use blocking acquire + # for locks? + if (self._timeout_strategy.Peek() is None and + op.priority > constants.OP_PRIO_HIGHEST): + logging.debug("Increasing priority") + op.priority -= 1 + self._ResetTimeoutStrategy() + return True + + return False + + def GetNextLockTimeout(self): + """Returns the next lock acquire timeout. + + """ + return self._timeout_strategy.Next() + + +class _JobProcessor(object): + def __init__(self, queue, opexec_fn, job, + _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy): + """Initializes this class. + + """ + self.queue = queue + self.opexec_fn = opexec_fn + self.job = job + self._timeout_strategy_factory = _timeout_strategy_factory + + @staticmethod + def _FindNextOpcode(job, timeout_strategy_factory): + """Locates the next opcode to run. + + @type job: L{_QueuedJob} + @param job: Job object + @param timeout_strategy_factory: Callable to create new timeout strategy + + """ + # Create some sort of a cache to speed up locating next opcode for future + # lookups + # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for + # pending and one for processed ops. + if job.ops_iter is None: + job.ops_iter = enumerate(job.ops) + + # Find next opcode to run + while True: + try: + (idx, op) = job.ops_iter.next() + except StopIteration: + raise errors.ProgrammerError("Called for a finished job") + + if op.status == constants.OP_STATUS_RUNNING: + # Found an opcode already marked as running + raise errors.ProgrammerError("Called for job marked as running") + + opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), + timeout_strategy_factory) + + if op.status == constants.OP_STATUS_CANCELED: + # Cancelled jobs are handled by the caller + assert not compat.any(i.status != constants.OP_STATUS_CANCELED + for i in job.ops[idx:]) + + elif op.status in constants.OPS_FINALIZED: + # This is a job that was partially completed before master daemon + # shutdown, so it can be expected that some opcodes are already + # completed successfully (if any did error out, then the whole job + # should have been aborted and not resubmitted for processing). + logging.info("%s: opcode %s already processed, skipping", + opctx.log_prefix, opctx.summary) + continue + + return opctx + + @staticmethod + def _MarkWaitlock(job, op): + """Marks an opcode as waiting for locks. + + The job's start timestamp is also set if necessary. + + @type job: L{_QueuedJob} + @param job: Job object + @type op: L{_QueuedOpCode} + @param op: Opcode object + + """ + assert op in job.ops + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITLOCK) + + update = False + + op.result = None + + if op.status == constants.OP_STATUS_QUEUED: + op.status = constants.OP_STATUS_WAITLOCK + update = True + + if op.start_timestamp is None: + op.start_timestamp = TimeStampNow() + update = True + + if job.start_timestamp is None: + job.start_timestamp = op.start_timestamp + update = True + + assert op.status == constants.OP_STATUS_WAITLOCK + + return update + + def _ExecOpCodeUnlocked(self, opctx): + """Processes one opcode and returns the result. + + """ + op = opctx.op + + assert op.status == constants.OP_STATUS_WAITLOCK + + timeout = opctx.GetNextLockTimeout() + + try: + # Make sure not to hold queue lock while calling ExecOpCode + result = self.opexec_fn(op.input, + _OpExecCallbacks(self.queue, self.job, op), + timeout=timeout, priority=op.priority) + except mcpu.LockAcquireTimeout: + assert timeout is not None, "Received timeout for blocking acquire" + logging.debug("Couldn't acquire locks in %0.6fs", timeout) + + assert op.status in (constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING) + + # Was job cancelled while we were waiting for the lock? + if op.status == constants.OP_STATUS_CANCELING: + return (constants.OP_STATUS_CANCELING, None) + + # Stay in waitlock while trying to re-acquire lock + return (constants.OP_STATUS_WAITLOCK, None) + except CancelJob: + logging.exception("%s: Canceling job", opctx.log_prefix) + assert op.status == constants.OP_STATUS_CANCELING + return (constants.OP_STATUS_CANCELING, None) + except Exception, err: # pylint: disable-msg=W0703 + logging.exception("%s: Caught exception in %s", + opctx.log_prefix, opctx.summary) + return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) + else: + logging.debug("%s: %s successful", + opctx.log_prefix, opctx.summary) + return (constants.OP_STATUS_SUCCESS, result) + + def __call__(self, _nextop_fn=None): + """Continues execution of a job. + + @param _nextop_fn: Callback function for tests + @rtype: bool + @return: True if job is finished, False if processor needs to be called + again + + """ + queue = self.queue + job = self.job + + logging.debug("Processing job %s", job.id) + + queue.acquire(shared=1) + try: + opcount = len(job.ops) + + # Is a previous opcode still pending? + if job.cur_opctx: + opctx = job.cur_opctx + job.cur_opctx = None + else: + if __debug__ and _nextop_fn: + _nextop_fn() + opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) + + op = opctx.op + + # Consistency check + assert compat.all(i.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_CANCELING, + constants.OP_STATUS_CANCELED) + for i in job.ops[opctx.index + 1:]) + + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING, + constants.OP_STATUS_CANCELED) + + assert (op.priority <= constants.OP_PRIO_LOWEST and + op.priority >= constants.OP_PRIO_HIGHEST) + + if op.status not in (constants.OP_STATUS_CANCELING, + constants.OP_STATUS_CANCELED): + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITLOCK) + + # Prepare to start opcode + if self._MarkWaitlock(job, op): + # Write to disk + queue.UpdateJobUnlocked(job) + + assert op.status == constants.OP_STATUS_WAITLOCK + assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK + assert job.start_timestamp and op.start_timestamp + + logging.info("%s: opcode %s waiting for locks", + opctx.log_prefix, opctx.summary) + + queue.release() + try: + (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) + finally: + queue.acquire(shared=1) + + op.status = op_status + op.result = op_result + + if op.status == constants.OP_STATUS_WAITLOCK: + # Couldn't get locks in time + assert not op.end_timestamp + else: + # Finalize opcode + op.end_timestamp = TimeStampNow() + + if op.status == constants.OP_STATUS_CANCELING: + assert not compat.any(i.status != constants.OP_STATUS_CANCELING + for i in job.ops[opctx.index:]) + else: + assert op.status in constants.OPS_FINALIZED + + if op.status == constants.OP_STATUS_WAITLOCK: + finalize = False + + if opctx.CheckPriorityIncrease(): + # Priority was changed, need to update on-disk file + queue.UpdateJobUnlocked(job) + + # Keep around for another round + job.cur_opctx = opctx + + assert (op.priority <= constants.OP_PRIO_LOWEST and + op.priority >= constants.OP_PRIO_HIGHEST) + + # In no case must the status be finalized here + assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK + + else: + # Ensure all opcodes so far have been successful + assert (opctx.index == 0 or + compat.all(i.status == constants.OP_STATUS_SUCCESS + for i in job.ops[:opctx.index])) + + # Reset context + job.cur_opctx = None + + if op.status == constants.OP_STATUS_SUCCESS: + finalize = False + + elif op.status == constants.OP_STATUS_ERROR: + # Ensure failed opcode has an exception as its result + assert errors.GetEncodedError(job.ops[opctx.index].result) + + to_encode = errors.OpExecError("Preceding opcode failed") + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + _EncodeOpError(to_encode)) + finalize = True + + # Consistency check + assert compat.all(i.status == constants.OP_STATUS_ERROR and + errors.GetEncodedError(i.result) + for i in job.ops[opctx.index:]) + + elif op.status == constants.OP_STATUS_CANCELING: + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") + finalize = True + + elif op.status == constants.OP_STATUS_CANCELED: + finalize = True + + else: + raise errors.ProgrammerError("Unknown status '%s'" % op.status) + + # Finalizing or last opcode? + if finalize or opctx.index == (opcount - 1): + # All opcodes have been run, finalize job + job.end_timestamp = TimeStampNow() + + # Write to disk. If the job status is final, this is the final write + # allowed. Once the file has been written, it can be archived anytime. + queue.UpdateJobUnlocked(job) + + if finalize or opctx.index == (opcount - 1): + logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) + return True + + return False + finally: + queue.release() + + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -714,125 +1119,23 @@ class _JobQueueWorker(workerpool.BaseWorker): def RunTask(self, job): # pylint: disable-msg=W0221 """Job executor. - This functions processes a job. It is closely tied to the _QueuedJob and - _QueuedOpCode classes. + This functions processes a job. It is closely tied to the L{_QueuedJob} and + L{_QueuedOpCode} classes. @type job: L{_QueuedJob} @param job: the job to be processed """ + queue = job.queue + assert queue == self.pool.queue + self.SetTaskName("Job%s" % job.id) - logging.info("Processing job %s", job.id) - proc = mcpu.Processor(self.pool.queue.context, job.id) - queue = job.queue - try: - try: - count = len(job.ops) - for idx, op in enumerate(job.ops): - op_summary = op.input.Summary() - if op.status == constants.OP_STATUS_SUCCESS: - # this is a job that was partially completed before master - # daemon shutdown, so it can be expected that some opcodes - # are already completed successfully (if any did error - # out, then the whole job should have been aborted and not - # resubmitted for processing) - logging.info("Op %s/%s: opcode %s already processed, skipping", - idx + 1, count, op_summary) - continue - try: - logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, - op_summary) - - queue.acquire(shared=1) - try: - if op.status == constants.OP_STATUS_CANCELED: - logging.debug("Canceling opcode") - raise CancelJob() - assert op.status == constants.OP_STATUS_QUEUED - logging.debug("Opcode %s/%s waiting for locks", - idx + 1, count) - op.status = constants.OP_STATUS_WAITLOCK - op.result = None - op.start_timestamp = TimeStampNow() - if idx == 0: # first opcode - job.start_timestamp = op.start_timestamp - queue.UpdateJobUnlocked(job) - - input_opcode = op.input - finally: - queue.release() - - # Make sure not to hold queue lock while calling ExecOpCode - result = proc.ExecOpCode(input_opcode, - _OpExecCallbacks(queue, job, op)) - - queue.acquire(shared=1) - try: - logging.debug("Opcode %s/%s succeeded", idx + 1, count) - op.status = constants.OP_STATUS_SUCCESS - op.result = result - op.end_timestamp = TimeStampNow() - if idx == count - 1: - job.end_timestamp = TimeStampNow() - - # Consistency check - assert compat.all(i.status == constants.OP_STATUS_SUCCESS - for i in job.ops) - - queue.UpdateJobUnlocked(job) - finally: - queue.release() - - logging.info("Op %s/%s: Successfully finished opcode %s", - idx + 1, count, op_summary) - except CancelJob: - # Will be handled further up - raise - except Exception, err: - queue.acquire(shared=1) - try: - try: - logging.debug("Opcode %s/%s failed", idx + 1, count) - op.status = constants.OP_STATUS_ERROR - op.result = _EncodeOpError(err) - op.end_timestamp = TimeStampNow() - logging.info("Op %s/%s: Error in opcode %s: %s", - idx + 1, count, op_summary, err) - - to_encode = errors.OpExecError("Preceding opcode failed") - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - _EncodeOpError(to_encode)) - - # Consistency check - assert compat.all(i.status == constants.OP_STATUS_SUCCESS - for i in job.ops[:idx]) - assert compat.all(i.status == constants.OP_STATUS_ERROR and - errors.GetEncodedError(i.result) - for i in job.ops[idx:]) - finally: - job.end_timestamp = TimeStampNow() - queue.UpdateJobUnlocked(job) - finally: - queue.release() - raise - - except CancelJob: - queue.acquire(shared=1) - try: - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, - "Job canceled by request") - job.end_timestamp = TimeStampNow() - queue.UpdateJobUnlocked(job) - finally: - queue.release() - except errors.GenericError, err: - logging.exception("Ganeti exception") - except: - logging.exception("Unhandled exception") - finally: - status = job.CalcStatus() - logging.info("Finished job %s, status = %s", job.id, status) + proc = mcpu.Processor(queue.context, job.id) + + if not _JobProcessor(queue, proc.ExecOpCode, job)(): + # Schedule again + raise workerpool.DeferTask(priority=job.CalcPriority()) class _JobQueueWorkerPool(workerpool.WorkerPool): @@ -948,6 +1251,8 @@ class JobQueue(object): """ logging.info("Inspecting job queue") + restartjobs = [] + all_job_ids = self._GetJobIDsUnlocked() jobs_count = len(all_job_ids) lastinfo = time.time() @@ -967,17 +1272,28 @@ class JobQueue(object): status = job.CalcStatus() - if status in (constants.JOB_STATUS_QUEUED, ): - self._wpool.AddTask((job, )) + if status == constants.JOB_STATUS_QUEUED: + restartjobs.append(job) elif status in (constants.JOB_STATUS_RUNNING, constants.JOB_STATUS_WAITLOCK, constants.JOB_STATUS_CANCELING): logging.warning("Unfinished job %s found: %s", job.id, job) - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - "Unclean master daemon shutdown") + + if status == constants.JOB_STATUS_WAITLOCK: + # Restart job + job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) + restartjobs.append(job) + else: + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") + self.UpdateJobUnlocked(job) + if restartjobs: + logging.info("Restarting %s jobs", len(restartjobs)) + self._EnqueueJobs(restartjobs) + logging.info("Job queue inspection finished") @locking.ssynchronized(_LOCK) @@ -1407,7 +1723,7 @@ class JobQueue(object): """ job_id = self._NewSerialsUnlocked(1)[0] - self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), )) + self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)]) return job_id @locking.ssynchronized(_LOCK) @@ -1419,21 +1735,32 @@ class JobQueue(object): """ results = [] - tasks = [] + added_jobs = [] all_job_ids = self._NewSerialsUnlocked(len(jobs)) for job_id, ops in zip(all_job_ids, jobs): try: - tasks.append((self._SubmitJobUnlocked(job_id, ops), )) + added_jobs.append(self._SubmitJobUnlocked(job_id, ops)) status = True data = job_id except errors.GenericError, err: data = str(err) status = False results.append((status, data)) - self._wpool.AddManyTasks(tasks) + + self._EnqueueJobs(added_jobs) return results + def _EnqueueJobs(self, jobs): + """Helper function to add jobs to worker pool's queue. + + @type jobs: list + @param jobs: List of all jobs + + """ + self._wpool.AddManyTasks([(job, ) for job in jobs], + priority=[job.CalcPriority() for job in jobs]) + @_RequireOpenQueue def UpdateJobUnlocked(self, job, replicate=True): """Update a job's on disk storage. @@ -1502,26 +1829,12 @@ class JobQueue(object): logging.debug("Job %s not found", job_id) return (False, "Job %s not found" % job_id) - job_status = job.CalcStatus() + (success, msg) = job.Cancel() - if job_status not in (constants.JOB_STATUS_QUEUED, - constants.JOB_STATUS_WAITLOCK): - logging.debug("Job %s is no longer waiting in the queue", job.id) - return (False, "Job %s is no longer waiting in the queue" % job.id) - - if job_status == constants.JOB_STATUS_QUEUED: - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, - "Job canceled by request") - msg = "Job %s canceled" % job.id - - elif job_status == constants.JOB_STATUS_WAITLOCK: - # The worker will notice the new status and cancel the job - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) - msg = "Job %s will be canceled" % job.id - - self.UpdateJobUnlocked(job) + if success: + self.UpdateJobUnlocked(job) - return (True, msg) + return (success, msg) @_RequireOpenQueue def _ArchiveJobsUnlocked(self, jobs):