X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/33987705ce06b4c2fa0112de9e988f305d552c1f..fc8a6b8f3334c1352caafcede1e31d801fc146ba:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index 66009fb..8657fed 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -47,7 +47,15 @@ from ganeti import utils from ganeti import jstore from ganeti import rpc + JOBQUEUE_THREADS = 25 +JOBS_PER_ARCHIVE_DIRECTORY = 10000 + + +class CancelJob(Exception): + """Special exception to cancel a job. + + """ def TimeStampNow(): @@ -61,7 +69,7 @@ def TimeStampNow(): class _QueuedOpCode(object): - """Encasulates an opcode object. + """Encapsulates an opcode object. @ivar log: holds the execution log and consists of tuples of the form C{(log_serial, timestamp, level, message)} @@ -72,6 +80,10 @@ class _QueuedOpCode(object): @ivar stop_timestamp: timestamp for the end of the execution """ + __slots__ = ["input", "status", "result", "log", + "start_timestamp", "end_timestamp", + "__weakref__"] + def __init__(self, op): """Constructor for the _QuededOpCode. @@ -144,6 +156,11 @@ class _QueuedJob(object): @ivar change: a Condition variable we use for waiting for job changes """ + __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial", + "received_timestamp", "start_timestamp", "end_timestamp", + "change", + "__weakref__"] + def __init__(self, queue, job_id, ops): """Constructor for the _QueuedJob. @@ -232,6 +249,7 @@ class _QueuedJob(object): status will be the same - otherwise, the last opcode with the status one of: - waitlock + - canceling - running will determine the job status @@ -257,6 +275,9 @@ class _QueuedJob(object): status = constants.JOB_STATUS_WAITLOCK elif op.status == constants.OP_STATUS_RUNNING: status = constants.JOB_STATUS_RUNNING + elif op.status == constants.OP_STATUS_CANCELING: + status = constants.JOB_STATUS_CANCELING + break elif op.status == constants.OP_STATUS_ERROR: status = constants.JOB_STATUS_ERROR # The whole job fails if one opcode failed @@ -274,7 +295,7 @@ class _QueuedJob(object): """Selectively returns the log entries. @type newer_than: None or int - @param newer_than: if this is None, return all log enties, + @param newer_than: if this is None, return all log entries, otherwise return only the log entries with serial higher than this value @rtype: list @@ -292,6 +313,26 @@ class _QueuedJob(object): return entries + def MarkUnfinishedOps(self, status, result): + """Mark unfinished opcodes with a given status and result. + + This is an utility function for marking all running or waiting to + be run opcodes with a given status. Opcodes which are already + finalised are not changed. + + @param status: a given opcode status + @param result: the opcode result + + """ + not_marked = True + for op in self.ops: + if op.status in constants.OPS_FINALIZED: + assert not_marked, "Finalized opcodes found after non-finalized ones" + continue + op.status = status + op.result = result + not_marked = False + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -311,6 +352,13 @@ class _JobQueueWorker(workerpool.BaseWorker): self.queue.acquire() try: + assert self.opcode.status in (constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING) + + # Cancel here if we were asked to + if self.opcode.status == constants.OP_STATUS_CANCELING: + raise CancelJob() + self.opcode.status = constants.OP_STATUS_RUNNING finally: self.queue.release() @@ -325,7 +373,7 @@ class _JobQueueWorker(workerpool.BaseWorker): @param job: the job to be processed """ - logging.debug("Worker %s processing job %s", + logging.info("Worker %s processing job %s", self.worker_id, job.id) proc = mcpu.Processor(self.pool.queue.context) self.queue = queue = job.queue @@ -333,11 +381,25 @@ class _JobQueueWorker(workerpool.BaseWorker): try: count = len(job.ops) for idx, op in enumerate(job.ops): + op_summary = op.input.Summary() + if op.status == constants.OP_STATUS_SUCCESS: + # this is a job that was partially completed before master + # daemon shutdown, so it can be expected that some opcodes + # are already completed successfully (if any did error + # out, then the whole job should have been aborted and not + # resubmitted for processing) + logging.info("Op %s/%s: opcode %s already processed, skipping", + idx + 1, count, op_summary) + continue try: - logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) + logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, + op_summary) queue.acquire() try: + if op.status == constants.OP_STATUS_CANCELED: + raise CancelJob() + assert op.status == constants.OP_STATUS_QUEUED job.run_op_index = idx op.status = constants.OP_STATUS_WAITLOCK op.result = None @@ -388,22 +450,35 @@ class _JobQueueWorker(workerpool.BaseWorker): finally: queue.release() - logging.debug("Op %s/%s: Successfully finished %s", - idx + 1, count, op) + logging.info("Op %s/%s: Successfully finished opcode %s", + idx + 1, count, op_summary) + except CancelJob: + # Will be handled further up + raise except Exception, err: queue.acquire() try: try: op.status = constants.OP_STATUS_ERROR - op.result = str(err) + if isinstance(err, errors.GenericError): + op.result = errors.EncodeException(err) + else: + op.result = str(err) op.end_timestamp = TimeStampNow() - logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) + logging.info("Op %s/%s: Error in opcode %s: %s", + idx + 1, count, op_summary, err) finally: queue.UpdateJobUnlocked(job) finally: queue.release() raise + except CancelJob: + queue.acquire() + try: + queue.CancelJobUnlocked(job) + finally: + queue.release() except errors.GenericError, err: logging.exception("Ganeti exception") except: @@ -412,7 +487,7 @@ class _JobQueueWorker(workerpool.BaseWorker): queue.acquire() try: try: - job.run_op_idx = -1 + job.run_op_index = -1 job.end_timestamp = TimeStampNow() queue.UpdateJobUnlocked(job) finally: @@ -420,8 +495,8 @@ class _JobQueueWorker(workerpool.BaseWorker): status = job.CalcStatus() finally: queue.release() - logging.debug("Worker %s finished job %s, status = %s", - self.worker_id, job_id, status) + logging.info("Worker %s finished job %s, status = %s", + self.worker_id, job_id, status) class _JobQueueWorkerPool(workerpool.WorkerPool): @@ -434,33 +509,37 @@ class _JobQueueWorkerPool(workerpool.WorkerPool): self.queue = queue -class JobQueue(object): - """Quue used to manaage the jobs. +def _RequireOpenQueue(fn): + """Decorator for "public" functions. - @cvar _RE_JOB_FILE: regex matching the valid job file names + This function should be used for all 'public' functions. That is, + functions usually called from other classes. Note that this should + be applied only to methods (not plain functions), since it expects + that the decorated function is called with a first argument that has + a '_queue_lock' argument. - """ - _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) + @warning: Use this decorator only after utils.LockedMethod! - def _RequireOpenQueue(fn): - """Decorator for "public" functions. + Example:: + @utils.LockedMethod + @_RequireOpenQueue + def Example(self): + pass - This function should be used for all 'public' functions. That is, - functions usually called from other classes. + """ + def wrapper(self, *args, **kwargs): + assert self._queue_lock is not None, "Queue should be open" + return fn(self, *args, **kwargs) + return wrapper - @warning: Use this decorator only after utils.LockedMethod! - Example:: - @utils.LockedMethod - @_RequireOpenQueue - def Example(self): - pass +class JobQueue(object): + """Queue used to manage the jobs. - """ - def wrapper(self, *args, **kwargs): - assert self._queue_lock is not None, "Queue should be open" - return fn(self, *args, **kwargs) - return wrapper + @cvar _RE_JOB_FILE: regex matching the valid job file names + + """ + _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) def __init__(self, context): """Constructor for JobQueue. @@ -494,7 +573,8 @@ class JobQueue(object): # Get initial list of nodes self._nodes = dict((n.name, n.primary_ip) - for n in self.context.cfg.GetAllNodesInfo().values()) + for n in self.context.cfg.GetAllNodesInfo().values() + if n.master_candidate) # Remove master node try: @@ -514,13 +594,14 @@ class JobQueue(object): logging.info("Inspecting job queue") all_job_ids = self._GetJobIDsUnlocked() + jobs_count = len(all_job_ids) lastinfo = time.time() for idx, job_id in enumerate(all_job_ids): # Give an update every 1000 jobs or 10 seconds - if idx % 1000 == 0 or time.time() >= (lastinfo + 10.0): - jobs_count = len(all_job_ids) + if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or + idx == (jobs_count - 1)): logging.info("Job queue inspection: %d/%d (%0.1f %%)", - idx, jobs_count, 100.0 * (idx + 1) / jobs_count) + idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) lastinfo = time.time() job = self._LoadJobUnlocked(job_id) @@ -535,12 +616,12 @@ class JobQueue(object): self._wpool.AddTask(job) elif status in (constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK): + constants.JOB_STATUS_WAITLOCK, + constants.JOB_STATUS_CANCELING): logging.warning("Unfinished job %s found: %s", job.id, job) try: - for op in job.ops: - op.status = constants.OP_STATUS_ERROR - op.result = "Unclean master daemon shutdown" + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") finally: self.UpdateJobUnlocked(job) @@ -566,6 +647,12 @@ class JobQueue(object): # Clean queue directory on added node rpc.RpcRunner.call_jobqueue_purge(node_name) + if not node.master_candidate: + # remove if existing, ignoring errors + self._nodes.pop(node_name, None) + # and skip the replication of the job ids + return + # Upload the whole queue excluding archived jobs files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] @@ -608,7 +695,7 @@ class JobQueue(object): Since we aim to keep consistency should this node (the current master) fail, we will log errors if our rpc fail, and especially - log the case when more than half of the nodes failes. + log the case when more than half of the nodes fails. @param result: the data as returned from the rpc call @type nodes: list @@ -665,24 +752,24 @@ class JobQueue(object): self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) - def _RenameFileUnlocked(self, old, new): + def _RenameFilesUnlocked(self, rename): """Renames a file locally and then replicate the change. This function will rename a file in the local queue directory and then replicate this rename to all the other nodes we have. - @type old: str - @param old: the current name of the file - @type new: str - @param new: the new name of the file + @type rename: list of (old, new) + @param rename: List containing tuples mapping old to new names """ - os.rename(old, new) + # Rename them locally + for old, new in rename: + utils.RenameFile(old, new, mkdir=True) + # ... and on all nodes names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new) - self._CheckRpcResult(result, self._nodes, - "Moving %s to %s" % (old, new)) + result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) + self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) def _FormatJobID(self, job_id): """Convert a job ID to string format. @@ -704,26 +791,43 @@ class JobQueue(object): return str(job_id) - def _NewSerialUnlocked(self): + @classmethod + def _GetArchiveDirectory(cls, job_id): + """Returns the archive directory for a job. + + @type job_id: str + @param job_id: Job identifier + @rtype: str + @return: Directory name + + """ + return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) + + def _NewSerialsUnlocked(self, count): """Generates a new job identifier. Job identifiers are unique during the lifetime of a cluster. + @type count: integer + @param count: how many serials to return @rtype: str @return: a string representing the job identifier. """ + assert count > 0 # New number - serial = self._last_serial + 1 + serial = self._last_serial + count # Write to file self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, "%s\n" % serial) + result = [self._FormatJobID(v) + for v in range(self._last_serial, serial + 1)] # Keep it only if we were able to write the file self._last_serial = serial - return self._FormatJobID(serial) + return result @staticmethod def _GetJobPath(job_id): @@ -737,8 +841,8 @@ class JobQueue(object): """ return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id) - @staticmethod - def _GetArchivedJobPath(job_id): + @classmethod + def _GetArchivedJobPath(cls, job_id): """Returns the archived job file for a give job id. @type job_id: str @@ -747,7 +851,8 @@ class JobQueue(object): @return: the path to the archived job file """ - return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id) + path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id) + return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path) @classmethod def _ExtractJobID(cls, name): @@ -835,7 +940,7 @@ class JobQueue(object): else: # non-archived case logging.exception("Can't parse job %s, will archive.", job_id) - self._RenameFileUnlocked(filepath, new_path) + self._RenameFilesUnlocked([(filepath, new_path)]) return None self._memcache[job_id] = job @@ -878,7 +983,7 @@ class JobQueue(object): and in the future we might merge them. @type drain_flag: boolean - @param drain_flag: wheter to set or unset the drain flag + @param drain_flag: Whether to set or unset the drain flag """ if drain_flag: @@ -887,14 +992,15 @@ class JobQueue(object): utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) return True - @utils.LockedMethod @_RequireOpenQueue - def SubmitJob(self, ops): + def _SubmitJobUnlocked(self, job_id, ops): """Create and store a new job. This enters the job into our job queue and also puts it on the new queue, in order for it to be picked up by the queue processors. + @type job_id: job ID + @param jod_id: the job ID for the new job @type ops: list @param ops: The list of OpCodes that will become the new job. @rtype: job ID @@ -903,9 +1009,19 @@ class JobQueue(object): """ if self._IsQueueMarkedDrain(): - raise errors.JobQueueDrainError() - # Get job identifier - job_id = self._NewSerialUnlocked() + raise errors.JobQueueDrainError("Job queue is drained, refusing job") + + # Check job queue size + size = len(self._ListJobFiles()) + if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT: + # TODO: Autoarchive jobs. Make sure it's not done on every job + # submission, though. + #size = ... + pass + + if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: + raise errors.JobQueueFull() + job = _QueuedJob(self, job_id, ops) # Write to disk @@ -919,6 +1035,39 @@ class JobQueue(object): return job.id + @utils.LockedMethod + @_RequireOpenQueue + def SubmitJob(self, ops): + """Create and store a new job. + + @see: L{_SubmitJobUnlocked} + + """ + job_id = self._NewSerialsUnlocked(1)[0] + return self._SubmitJobUnlocked(job_id, ops) + + @utils.LockedMethod + @_RequireOpenQueue + def SubmitManyJobs(self, jobs): + """Create and store multiple jobs. + + @see: L{_SubmitJobUnlocked} + + """ + results = [] + all_job_ids = self._NewSerialsUnlocked(len(jobs)) + for job_id, ops in zip(all_job_ids, jobs): + try: + data = self._SubmitJobUnlocked(job_id, ops) + status = True + except errors.GenericError, err: + data = str(err) + status = False + results.append((status, data)) + + return results + + @_RequireOpenQueue def UpdateJobUnlocked(self, job): """Update a job's on disk storage. @@ -966,6 +1115,10 @@ class JobQueue(object): """ logging.debug("Waiting for changes in job %s", job_id) + + job_info = None + log_entries = None + end_time = time.time() + timeout while True: delta_time = end_time - time.time() @@ -1007,7 +1160,10 @@ class JobQueue(object): logging.debug("Job %s changed", job_id) - return (job_info, log_entries) + if job_info is None and log_entries is None: + return None + else: + return (job_info, log_entries) @utils.LockedMethod @_RequireOpenQueue @@ -1020,68 +1176,101 @@ class JobQueue(object): @param job_id: job ID of job to be cancelled. """ - logging.debug("Cancelling job %s", job_id) + logging.info("Cancelling job %s", job_id) job = self._LoadJobUnlocked(job_id) if not job: logging.debug("Job %s not found", job_id) - return + return (False, "Job %s not found" % job_id) - if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,): - logging.debug("Job %s is no longer in the queue", job.id) - return + job_status = job.CalcStatus() + + if job_status not in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_WAITLOCK): + logging.debug("Job %s is no longer waiting in the queue", job.id) + return (False, "Job %s is no longer waiting in the queue" % job.id) + + if job_status == constants.JOB_STATUS_QUEUED: + self.CancelJobUnlocked(job) + return (True, "Job %s canceled" % job.id) + + elif job_status == constants.JOB_STATUS_WAITLOCK: + # The worker will notice the new status and cancel the job + try: + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) + finally: + self.UpdateJobUnlocked(job) + return (True, "Job %s will be canceled" % job.id) + @_RequireOpenQueue + def CancelJobUnlocked(self, job): + """Marks a job as canceled. + + """ try: - for op in job.ops: - op.status = constants.OP_STATUS_ERROR - op.result = "Job cancelled by request" + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") finally: self.UpdateJobUnlocked(job) @_RequireOpenQueue - def _ArchiveJobUnlocked(self, job_id): - """Archives a job. + def _ArchiveJobsUnlocked(self, jobs): + """Archives jobs. - @type job_id: string - @param job_id: Job ID of job to be archived. + @type jobs: list of L{_QueuedJob} + @param jobs: Job objects + @rtype: int + @return: Number of archived jobs """ - logging.info("Archiving job %s", job_id) + archive_jobs = [] + rename_files = [] + for job in jobs: + if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, + constants.JOB_STATUS_SUCCESS, + constants.JOB_STATUS_ERROR): + logging.debug("Job %s is not yet done", job.id) + continue - job = self._LoadJobUnlocked(job_id) - if not job: - logging.debug("Job %s not found", job_id) - return + archive_jobs.append(job) - if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, - constants.JOB_STATUS_SUCCESS, - constants.JOB_STATUS_ERROR): - logging.debug("Job %s is not yet done", job.id) - return + old = self._GetJobPath(job.id) + new = self._GetArchivedJobPath(job.id) + rename_files.append((old, new)) - old = self._GetJobPath(job.id) - new = self._GetArchivedJobPath(job.id) + # TODO: What if 1..n files fail to rename? + self._RenameFilesUnlocked(rename_files) - self._RenameFileUnlocked(old, new) + logging.debug("Successfully archived job(s) %s", + ", ".join(job.id for job in archive_jobs)) - logging.debug("Successfully archived job %s", job.id) + return len(archive_jobs) @utils.LockedMethod @_RequireOpenQueue def ArchiveJob(self, job_id): """Archives a job. - This is just a wrapper over L{_ArchiveJobUnlocked}. + This is just a wrapper over L{_ArchiveJobsUnlocked}. @type job_id: string @param job_id: Job ID of job to be archived. + @rtype: bool + @return: Whether job was archived """ - return self._ArchiveJobUnlocked(job_id) + logging.info("Archiving job %s", job_id) + + job = self._LoadJobUnlocked(job_id) + if not job: + logging.debug("Job %s not found", job_id) + return False + + return self._ArchiveJobsUnlocked([job]) == 1 @utils.LockedMethod @_RequireOpenQueue - def AutoArchiveJobs(self, age): + def AutoArchiveJobs(self, age, timeout): """Archives all jobs based on age. The method will archive all jobs which are older than the age @@ -1096,22 +1285,44 @@ class JobQueue(object): logging.info("Archiving jobs with age more than %s seconds", age) now = time.time() - for jid in self._GetJobIDsUnlocked(archived=False): - job = self._LoadJobUnlocked(jid) - if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS, - constants.OP_STATUS_ERROR, - constants.OP_STATUS_CANCELED): - continue - if job.end_timestamp is None: - if job.start_timestamp is None: - job_age = job.received_timestamp + end_time = now + timeout + archived_count = 0 + last_touched = 0 + + all_job_ids = self._GetJobIDsUnlocked(archived=False) + pending = [] + for idx, job_id in enumerate(all_job_ids): + last_touched = idx + + # Not optimal because jobs could be pending + # TODO: Measure average duration for job archival and take number of + # pending jobs into account. + if time.time() > end_time: + break + + # Returns None if the job failed to load + job = self._LoadJobUnlocked(job_id) + if job: + if job.end_timestamp is None: + if job.start_timestamp is None: + job_age = job.received_timestamp + else: + job_age = job.start_timestamp else: - job_age = job.start_timestamp - else: - job_age = job.end_timestamp + job_age = job.end_timestamp + + if age == -1 or now - job_age[0] > age: + pending.append(job) + + # Archive 10 jobs at a time + if len(pending) >= 10: + archived_count += self._ArchiveJobsUnlocked(pending) + pending = [] + + if pending: + archived_count += self._ArchiveJobsUnlocked(pending) - if age == -1 or now - job_age[0] > age: - self._ArchiveJobUnlocked(jid) + return (archived_count, len(all_job_ids) - last_touched - 1) def _GetJobInfoUnlocked(self, job, fields): """Returns information about a job.