X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/78d12585b49b6831e43be5c3e27cfcc47a01345b..944bf54895c1d4491c6d06ad464aa6e97844c366:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index dbfc49b..90757a5 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -344,7 +344,7 @@ class _JobQueueWorker(workerpool.BaseWorker): @param job: the job to be processed """ - logging.debug("Worker %s processing job %s", + logging.info("Worker %s processing job %s", self.worker_id, job.id) proc = mcpu.Processor(self.pool.queue.context) self.queue = queue = job.queue @@ -352,11 +352,15 @@ class _JobQueueWorker(workerpool.BaseWorker): try: count = len(job.ops) for idx, op in enumerate(job.ops): + op_summary = op.input.Summary() try: - logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) + logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, + op_summary) queue.acquire() try: + if op.status == constants.OP_STATUS_CANCELED: + raise CancelJob() assert op.status == constants.OP_STATUS_QUEUED job.run_op_index = idx op.status = constants.OP_STATUS_WAITLOCK @@ -408,8 +412,8 @@ class _JobQueueWorker(workerpool.BaseWorker): finally: queue.release() - logging.debug("Op %s/%s: Successfully finished %s", - idx + 1, count, op) + logging.info("Op %s/%s: Successfully finished opcode %s", + idx + 1, count, op_summary) except CancelJob: # Will be handled further up raise @@ -420,7 +424,8 @@ class _JobQueueWorker(workerpool.BaseWorker): op.status = constants.OP_STATUS_ERROR op.result = str(err) op.end_timestamp = TimeStampNow() - logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) + logging.info("Op %s/%s: Error in opcode %s: %s", + idx + 1, count, op_summary, err) finally: queue.UpdateJobUnlocked(job) finally: @@ -449,8 +454,8 @@ class _JobQueueWorker(workerpool.BaseWorker): status = job.CalcStatus() finally: queue.release() - logging.debug("Worker %s finished job %s, status = %s", - self.worker_id, job_id, status) + logging.info("Worker %s finished job %s, status = %s", + self.worker_id, job_id, status) class _JobQueueWorkerPool(workerpool.WorkerPool): @@ -703,24 +708,24 @@ class JobQueue(object): self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) - def _RenameFileUnlocked(self, old, new): + def _RenameFilesUnlocked(self, rename): """Renames a file locally and then replicate the change. This function will rename a file in the local queue directory and then replicate this rename to all the other nodes we have. - @type old: str - @param old: the current name of the file - @type new: str - @param new: the new name of the file + @type rename: list of (old, new) + @param rename: List containing tuples mapping old to new names """ - utils.RenameFile(old, new, mkdir=True) + # Rename them locally + for old, new in rename: + utils.RenameFile(old, new, mkdir=True) + # ... and on all nodes names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new) - self._CheckRpcResult(result, self._nodes, - "Moving %s to %s" % (old, new)) + result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) + self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) def _FormatJobID(self, job_id): """Convert a job ID to string format. @@ -886,7 +891,7 @@ class JobQueue(object): else: # non-archived case logging.exception("Can't parse job %s, will archive.", job_id) - self._RenameFileUnlocked(filepath, new_path) + self._RenameFilesUnlocked([(filepath, new_path)]) return None self._memcache[job_id] = job @@ -938,9 +943,8 @@ class JobQueue(object): utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) return True - @utils.LockedMethod @_RequireOpenQueue - def SubmitJob(self, ops): + def _SubmitJobUnlocked(self, ops): """Create and store a new job. This enters the job into our job queue and also puts it on the new @@ -954,7 +958,7 @@ class JobQueue(object): """ if self._IsQueueMarkedDrain(): - raise errors.JobQueueDrainError() + raise errors.JobQueueDrainError("Job queue is drained, refusing job") # Check job queue size size = len(self._ListJobFiles()) @@ -982,6 +986,37 @@ class JobQueue(object): return job.id + @utils.LockedMethod + @_RequireOpenQueue + def SubmitJob(self, ops): + """Create and store a new job. + + @see: L{_SubmitJobUnlocked} + + """ + return self._SubmitJobUnlocked(ops) + + @utils.LockedMethod + @_RequireOpenQueue + def SubmitManyJobs(self, jobs): + """Create and store multiple jobs. + + @see: L{_SubmitJobUnlocked} + + """ + results = [] + for ops in jobs: + try: + data = self._SubmitJobUnlocked(ops) + status = True + except errors.GenericError, err: + data = str(err) + status = False + results.append((status, data)) + + return results + + @_RequireOpenQueue def UpdateJobUnlocked(self, job): """Update a job's on disk storage. @@ -1117,42 +1152,50 @@ class JobQueue(object): """ try: for op in job.ops: - op.status = constants.OP_STATUS_ERROR + op.status = constants.OP_STATUS_CANCELED op.result = "Job canceled by request" finally: self.UpdateJobUnlocked(job) @_RequireOpenQueue - def _ArchiveJobUnlocked(self, job): - """Archives a job. + def _ArchiveJobsUnlocked(self, jobs): + """Archives jobs. - @type job: L{_QueuedJob} - @param job: Job object - @rtype bool - @return Whether job was archived + @type jobs: list of L{_QueuedJob} + @param jobs: Job objects + @rtype: int + @return: Number of archived jobs """ - if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, - constants.JOB_STATUS_SUCCESS, - constants.JOB_STATUS_ERROR): - logging.debug("Job %s is not yet done", job.id) - return False + archive_jobs = [] + rename_files = [] + for job in jobs: + if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, + constants.JOB_STATUS_SUCCESS, + constants.JOB_STATUS_ERROR): + logging.debug("Job %s is not yet done", job.id) + continue - old = self._GetJobPath(job.id) - new = self._GetArchivedJobPath(job.id) + archive_jobs.append(job) - self._RenameFileUnlocked(old, new) + old = self._GetJobPath(job.id) + new = self._GetArchivedJobPath(job.id) + rename_files.append((old, new)) - logging.debug("Successfully archived job %s", job.id) + # TODO: What if 1..n files fail to rename? + self._RenameFilesUnlocked(rename_files) - return True + logging.debug("Successfully archived job(s) %s", + ", ".join(job.id for job in archive_jobs)) + + return len(archive_jobs) @utils.LockedMethod @_RequireOpenQueue def ArchiveJob(self, job_id): """Archives a job. - This is just a wrapper over L{_ArchiveJobUnlocked}. + This is just a wrapper over L{_ArchiveJobsUnlocked}. @type job_id: string @param job_id: Job ID of job to be archived. @@ -1167,11 +1210,11 @@ class JobQueue(object): logging.debug("Job %s not found", job_id) return False - return self._ArchiveJobUnlocked(job) + return self._ArchiveJobsUnlocked([job]) == 1 @utils.LockedMethod @_RequireOpenQueue - def AutoArchiveJobs(self, age): + def AutoArchiveJobs(self, age, timeout): """Archives all jobs based on age. The method will archive all jobs which are older than the age @@ -1186,22 +1229,44 @@ class JobQueue(object): logging.info("Archiving jobs with age more than %s seconds", age) now = time.time() - for job_id in self._GetJobIDsUnlocked(archived=False): + end_time = now + timeout + archived_count = 0 + last_touched = 0 + + all_job_ids = self._GetJobIDsUnlocked(archived=False) + pending = [] + for idx, job_id in enumerate(all_job_ids): + last_touched = idx + + # Not optimal because jobs could be pending + # TODO: Measure average duration for job archival and take number of + # pending jobs into account. + if time.time() > end_time: + break + # Returns None if the job failed to load job = self._LoadJobUnlocked(job_id) - if not job: - continue - - if job.end_timestamp is None: - if job.start_timestamp is None: - job_age = job.received_timestamp + if job: + if job.end_timestamp is None: + if job.start_timestamp is None: + job_age = job.received_timestamp + else: + job_age = job.start_timestamp else: - job_age = job.start_timestamp - else: - job_age = job.end_timestamp + job_age = job.end_timestamp + + if age == -1 or now - job_age[0] > age: + pending.append(job) + + # Archive 10 jobs at a time + if len(pending) >= 10: + archived_count += self._ArchiveJobsUnlocked(pending) + pending = [] + + if pending: + archived_count += self._ArchiveJobsUnlocked(pending) - if age == -1 or now - job_age[0] > age: - self._ArchiveJobUnlocked(job) + return (archived_count, len(all_job_ids) - last_touched - 1) def _GetJobInfoUnlocked(self, job, fields): """Returns information about a job.