X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/99aabbed5d61268db84cc339ccae129b0d7f867e..944bf54895c1d4491c6d06ad464aa6e97844c366:/lib/jqueue.py?ds=sidebyside diff --git a/lib/jqueue.py b/lib/jqueue.py index 86c3d86..90757a5 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -21,9 +21,11 @@ """Module implementing the job queue handling. -Locking: -There's a single, large lock in the JobQueue class. It's used by all other -classes in this module. +Locking: there's a single, large lock in the L{JobQueue} class. It's +used by all other classes in this module. + +@var JOBQUEUE_THREADS: the number of worker threads we start for + processing jobs """ @@ -45,23 +47,46 @@ from ganeti import utils from ganeti import jstore from ganeti import rpc -from ganeti.rpc import RpcRunner JOBQUEUE_THREADS = 25 +JOBS_PER_ARCHIVE_DIRECTORY = 10000 + + +class CancelJob(Exception): + """Special exception to cancel a job. + + """ def TimeStampNow(): + """Returns the current timestamp. + + @rtype: tuple + @return: the current time in the (seconds, microseconds) format + + """ return utils.SplitTime(time.time()) class _QueuedOpCode(object): """Encasulates an opcode object. - The 'log' attribute holds the execution log and consists of tuples - of the form (log_serial, timestamp, level, message). + @ivar log: holds the execution log and consists of tuples + of the form C{(log_serial, timestamp, level, message)} + @ivar input: the OpCode we encapsulate + @ivar status: the current status + @ivar result: the result of the LU execution + @ivar start_timestamp: timestamp for the start of the execution + @ivar stop_timestamp: timestamp for the end of the execution """ def __init__(self, op): + """Constructor for the _QuededOpCode. + + @type op: L{opcodes.OpCode} + @param op: the opcode we encapsulate + + """ self.input = op self.status = constants.OP_STATUS_QUEUED self.result = None @@ -71,6 +96,14 @@ class _QueuedOpCode(object): @classmethod def Restore(cls, state): + """Restore the _QueuedOpCode from the serialized form. + + @type state: dict + @param state: the serialized state + @rtype: _QueuedOpCode + @return: a new _QueuedOpCode instance + + """ obj = _QueuedOpCode.__new__(cls) obj.input = opcodes.OpCode.LoadOpCode(state["input"]) obj.status = state["status"] @@ -81,6 +114,12 @@ class _QueuedOpCode(object): return obj def Serialize(self): + """Serializes this _QueuedOpCode. + + @rtype: dict + @return: the dictionary holding the serialized state + + """ return { "input": self.input.__getstate__(), "status": self.status, @@ -94,13 +133,39 @@ class _QueuedOpCode(object): class _QueuedJob(object): """In-memory job representation. - This is what we use to track the user-submitted jobs. Locking must be taken - care of by users of this class. + This is what we use to track the user-submitted jobs. Locking must + be taken care of by users of this class. + + @type queue: L{JobQueue} + @ivar queue: the parent queue + @ivar id: the job ID + @type ops: list + @ivar ops: the list of _QueuedOpCode that constitute the job + @type run_op_index: int + @ivar run_op_index: the currently executing opcode, or -1 if + we didn't yet start executing + @type log_serial: int + @ivar log_serial: holds the index for the next log entry + @ivar received_timestamp: the timestamp for when the job was received + @ivar start_timestmap: the timestamp for start of execution + @ivar end_timestamp: the timestamp for end of execution + @ivar change: a Condition variable we use for waiting for job changes """ def __init__(self, queue, job_id, ops): + """Constructor for the _QueuedJob. + + @type queue: L{JobQueue} + @param queue: our parent queue + @type job_id: job_id + @param job_id: our job id + @type ops: list + @param ops: the list of opcodes we hold, which will be encapsulated + in _QueuedOpCodes + + """ if not ops: - # TODO + # TODO: use a better exception raise Exception("No opcodes") self.queue = queue @@ -117,6 +182,16 @@ class _QueuedJob(object): @classmethod def Restore(cls, queue, state): + """Restore a _QueuedJob from serialized state: + + @type queue: L{JobQueue} + @param queue: to which queue the restored job belongs + @type state: dict + @param state: the serialized state + @rtype: _JobQueue + @return: the restored _JobQueue instance + + """ obj = _QueuedJob.__new__(cls) obj.queue = queue obj.id = state["id"] @@ -139,6 +214,12 @@ class _QueuedJob(object): return obj def Serialize(self): + """Serialize the _JobQueue instance. + + @rtype: dict + @return: the serialized state + + """ return { "id": self.id, "ops": [op.Serialize() for op in self.ops], @@ -149,6 +230,27 @@ class _QueuedJob(object): } def CalcStatus(self): + """Compute the status of this job. + + This function iterates over all the _QueuedOpCodes in the job and + based on their status, computes the job status. + + The algorithm is: + - if we find a cancelled, or finished with error, the job + status will be the same + - otherwise, the last opcode with the status one of: + - waitlock + - canceling + - running + + will determine the job status + + - otherwise, it means either all opcodes are queued, or success, + and the job status will be the same + + @return: the job status + + """ status = constants.JOB_STATUS_QUEUED all_success = True @@ -164,6 +266,9 @@ class _QueuedJob(object): status = constants.JOB_STATUS_WAITLOCK elif op.status == constants.OP_STATUS_RUNNING: status = constants.JOB_STATUS_RUNNING + elif op.status == constants.OP_STATUS_CANCELING: + status = constants.JOB_STATUS_CANCELING + break elif op.status == constants.OP_STATUS_ERROR: status = constants.JOB_STATUS_ERROR # The whole job fails if one opcode failed @@ -178,6 +283,16 @@ class _QueuedJob(object): return status def GetLogEntries(self, newer_than): + """Selectively returns the log entries. + + @type newer_than: None or int + @param newer_than: if this is None, return all log enties, + otherwise return only the log entries with serial higher + than this value + @rtype: list + @return: the list of the log entries selected + + """ if newer_than is None: serial = -1 else: @@ -185,12 +300,15 @@ class _QueuedJob(object): entries = [] for op in self.ops: - entries.extend(filter(lambda entry: entry[0] > newer_than, op.log)) + entries.extend(filter(lambda entry: entry[0] > serial, op.log)) return entries class _JobQueueWorker(workerpool.BaseWorker): + """The actual job workers. + + """ def _NotifyStart(self): """Mark the opcode as running, not lock-waiting. @@ -205,6 +323,13 @@ class _JobQueueWorker(workerpool.BaseWorker): self.queue.acquire() try: + assert self.opcode.status in (constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING) + + # Cancel here if we were asked to + if self.opcode.status == constants.OP_STATUS_CANCELING: + raise CancelJob() + self.opcode.status = constants.OP_STATUS_RUNNING finally: self.queue.release() @@ -215,8 +340,11 @@ class _JobQueueWorker(workerpool.BaseWorker): This functions processes a job. It is closely tied to the _QueuedJob and _QueuedOpCode classes. + @type job: L{_QueuedJob} + @param job: the job to be processed + """ - logging.debug("Worker %s processing job %s", + logging.info("Worker %s processing job %s", self.worker_id, job.id) proc = mcpu.Processor(self.pool.queue.context) self.queue = queue = job.queue @@ -224,11 +352,16 @@ class _JobQueueWorker(workerpool.BaseWorker): try: count = len(job.ops) for idx, op in enumerate(job.ops): + op_summary = op.input.Summary() try: - logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) + logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, + op_summary) queue.acquire() try: + if op.status == constants.OP_STATUS_CANCELED: + raise CancelJob() + assert op.status == constants.OP_STATUS_QUEUED job.run_op_index = idx op.status = constants.OP_STATUS_WAITLOCK op.result = None @@ -279,8 +412,11 @@ class _JobQueueWorker(workerpool.BaseWorker): finally: queue.release() - logging.debug("Op %s/%s: Successfully finished %s", - idx + 1, count, op) + logging.info("Op %s/%s: Successfully finished opcode %s", + idx + 1, count, op_summary) + except CancelJob: + # Will be handled further up + raise except Exception, err: queue.acquire() try: @@ -288,13 +424,20 @@ class _JobQueueWorker(workerpool.BaseWorker): op.status = constants.OP_STATUS_ERROR op.result = str(err) op.end_timestamp = TimeStampNow() - logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) + logging.info("Op %s/%s: Error in opcode %s: %s", + idx + 1, count, op_summary, err) finally: queue.UpdateJobUnlocked(job) finally: queue.release() raise + except CancelJob: + queue.acquire() + try: + queue.CancelJobUnlocked(job) + finally: + queue.release() except errors.GenericError, err: logging.exception("Ganeti exception") except: @@ -311,11 +454,14 @@ class _JobQueueWorker(workerpool.BaseWorker): status = job.CalcStatus() finally: queue.release() - logging.debug("Worker %s finished job %s, status = %s", - self.worker_id, job_id, status) + logging.info("Worker %s finished job %s, status = %s", + self.worker_id, job_id, status) class _JobQueueWorkerPool(workerpool.WorkerPool): + """Simple class implementing a job-processing workerpool. + + """ def __init__(self, queue): super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS, _JobQueueWorker) @@ -323,17 +469,22 @@ class _JobQueueWorkerPool(workerpool.WorkerPool): class JobQueue(object): + """Quue used to manaage the jobs. + + @cvar _RE_JOB_FILE: regex matching the valid job file names + + """ _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) def _RequireOpenQueue(fn): """Decorator for "public" functions. - This function should be used for all "public" functions. That is, functions - usually called from other classes. + This function should be used for all 'public' functions. That is, + functions usually called from other classes. - Important: Use this decorator only after utils.LockedMethod! + @warning: Use this decorator only after utils.LockedMethod! - Example: + Example:: @utils.LockedMethod @_RequireOpenQueue def Example(self): @@ -346,6 +497,18 @@ class JobQueue(object): return wrapper def __init__(self, context): + """Constructor for JobQueue. + + The constructor will initialize the job queue object and then + start loading the current jobs from disk, either for starting them + (if they were queue) or for aborting them (if they were already + running). + + @type context: GanetiContext + @param context: the context object for access to the configuration + data and other ganeti objects + + """ self.context = context self._memcache = weakref.WeakValueDictionary() self._my_hostname = utils.HostInfo().name @@ -365,44 +528,65 @@ class JobQueue(object): # Get initial list of nodes self._nodes = dict((n.name, n.primary_ip) - for n in self.context.cfg.GetAllNodesInfo().values()) + for n in self.context.cfg.GetAllNodesInfo().values() + if n.master_candidate) # Remove master node try: del self._nodes[self._my_hostname] - except ValueError: + except KeyError: pass # TODO: Check consistency across nodes # Setup worker pool self._wpool = _JobQueueWorkerPool(self) - - # We need to lock here because WorkerPool.AddTask() may start a job while - # we're still doing our work. - self.acquire() try: - for job in self._GetJobsUnlocked(None): - # a failure in loading the job can cause 'None' to be returned - if job is None: - continue + # We need to lock here because WorkerPool.AddTask() may start a job while + # we're still doing our work. + self.acquire() + try: + logging.info("Inspecting job queue") - status = job.CalcStatus() + all_job_ids = self._GetJobIDsUnlocked() + jobs_count = len(all_job_ids) + lastinfo = time.time() + for idx, job_id in enumerate(all_job_ids): + # Give an update every 1000 jobs or 10 seconds + if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or + idx == (jobs_count - 1)): + logging.info("Job queue inspection: %d/%d (%0.1f %%)", + idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) + lastinfo = time.time() - if status in (constants.JOB_STATUS_QUEUED, ): - self._wpool.AddTask(job) + job = self._LoadJobUnlocked(job_id) - elif status in (constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK): - logging.warning("Unfinished job %s found: %s", job.id, job) - try: - for op in job.ops: - op.status = constants.OP_STATUS_ERROR - op.result = "Unclean master daemon shutdown" - finally: - self.UpdateJobUnlocked(job) - finally: - self.release() + # a failure in loading the job can cause 'None' to be returned + if job is None: + continue + + status = job.CalcStatus() + + if status in (constants.JOB_STATUS_QUEUED, ): + self._wpool.AddTask(job) + + elif status in (constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK, + constants.JOB_STATUS_CANCELING): + logging.warning("Unfinished job %s found: %s", job.id, job) + try: + for op in job.ops: + op.status = constants.OP_STATUS_ERROR + op.result = "Unclean master daemon shutdown" + finally: + self.UpdateJobUnlocked(job) + + logging.info("Job queue inspection finished") + finally: + self.release() + except: + self._wpool.TerminateWorkers() + raise @utils.LockedMethod @_RequireOpenQueue @@ -417,7 +601,13 @@ class JobQueue(object): assert node_name != self._my_hostname # Clean queue directory on added node - RpcRunner.call_jobqueue_purge(node_name) + rpc.RpcRunner.call_jobqueue_purge(node_name) + + if not node.master_candidate: + # remove if existing, ignoring errors + self._nodes.pop(node_name, None) + # and skip the replication of the job ids + return # Upload the whole queue excluding archived jobs files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] @@ -433,8 +623,9 @@ class JobQueue(object): finally: fd.close() - result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip], - file_name, content) + result = rpc.RpcRunner.call_jobqueue_update([node_name], + [node.primary_ip], + file_name, content) if not result[node_name]: logging.error("Failed to upload %s to %s", file_name, node_name) @@ -443,6 +634,12 @@ class JobQueue(object): @utils.LockedMethod @_RequireOpenQueue def RemoveNode(self, node_name): + """Callback called when removing nodes from the cluster. + + @type node_name: str + @param node_name: the name of the node to remove + + """ try: # The queue is removed by the "leave node" RPC call. del self._nodes[node_name] @@ -450,6 +647,19 @@ class JobQueue(object): pass def _CheckRpcResult(self, result, nodes, failmsg): + """Verifies the status of an RPC call. + + Since we aim to keep consistency should this node (the current + master) fail, we will log errors if our rpc fail, and especially + log the case when more than half of the nodes failes. + + @param result: the data as returned from the rpc call + @type nodes: list + @param nodes: the list of nodes we made the call to + @type failmsg: str + @param failmsg: the identifier to be used for logging + + """ failed = [] success = [] @@ -470,6 +680,10 @@ class JobQueue(object): def _GetNodeIp(self): """Helper for returning the node name/ip list. + @rtype: (list, list) + @return: a tuple of two lists, the first one with the node + names and the second one with the node addresses + """ name_list = self._nodes.keys() addr_list = [self._nodes[name] for name in name_list] @@ -478,23 +692,54 @@ class JobQueue(object): def _WriteAndReplicateFileUnlocked(self, file_name, data): """Writes a file locally and then replicates it to all nodes. + This function will replace the contents of a file on the local + node and then replicate it to all the other nodes we have. + + @type file_name: str + @param file_name: the path of the file to be replicated + @type data: str + @param data: the new contents of the file + """ utils.WriteFile(file_name, data=data) names, addrs = self._GetNodeIp() - result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data) + result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) - def _RenameFileUnlocked(self, old, new): - os.rename(old, new) + def _RenameFilesUnlocked(self, rename): + """Renames a file locally and then replicate the change. + + This function will rename a file in the local queue directory + and then replicate this rename to all the other nodes we have. + @type rename: list of (old, new) + @param rename: List containing tuples mapping old to new names + + """ + # Rename them locally + for old, new in rename: + utils.RenameFile(old, new, mkdir=True) + + # ... and on all nodes names, addrs = self._GetNodeIp() - result = RpcRunner.call_jobqueue_rename(names, addrs, old, new) - self._CheckRpcResult(result, self._nodes, - "Moving %s to %s" % (old, new)) + result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) + self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) def _FormatJobID(self, job_id): + """Convert a job ID to string format. + + Currently this just does C{str(job_id)} after performing some + checks, but if we want to change the job id format this will + abstract this change. + + @type job_id: int or long + @param job_id: the numeric job id + @rtype: str + @return: the formatted job id + + """ if not isinstance(job_id, (int, long)): raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) if job_id < 0: @@ -502,12 +747,25 @@ class JobQueue(object): return str(job_id) + @classmethod + def _GetArchiveDirectory(cls, job_id): + """Returns the archive directory for a job. + + @type job_id: str + @param job_id: Job identifier + @rtype: str + @return: Directory name + + """ + return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) + def _NewSerialUnlocked(self): """Generates a new job identifier. Job identifiers are unique during the lifetime of a cluster. - Returns: A string representing the job identifier. + @rtype: str + @return: a string representing the job identifier. """ # New number @@ -524,14 +782,41 @@ class JobQueue(object): @staticmethod def _GetJobPath(job_id): + """Returns the job file for a given job id. + + @type job_id: str + @param job_id: the job identifier + @rtype: str + @return: the path to the job file + + """ return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id) - @staticmethod - def _GetArchivedJobPath(job_id): - return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id) + @classmethod + def _GetArchivedJobPath(cls, job_id): + """Returns the archived job file for a give job id. + + @type job_id: str + @param job_id: the job identifier + @rtype: str + @return: the path to the archived job file + + """ + path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id) + return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path) @classmethod def _ExtractJobID(cls, name): + """Extract the job id from a filename. + + @type name: str + @param name: the job filename + @rtype: job id or None + @return: the job id corresponding to the given filename, + or None if the filename does not represent a valid + job file + + """ m = cls._RE_JOB_FILE.match(name) if m: return m.group(1) @@ -548,16 +833,36 @@ class JobQueue(object): jobs are present on disk (so in the _memcache we don't have any extra IDs). + @rtype: list + @return: the list of job IDs + """ jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()] jlist = utils.NiceSort(jlist) return jlist def _ListJobFiles(self): + """Returns the list of current job files. + + @rtype: list + @return: the list of job file names + + """ return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR) if self._RE_JOB_FILE.match(name)] def _LoadJobUnlocked(self, job_id): + """Loads a job from the disk or memory. + + Given a job id, this will return the cached job object if + existing, or try to load the job from the disk. If loading from + disk, it will also add the job to the cache. + + @param job_id: the job id + @rtype: L{_QueuedJob} or None + @return: either None or the job object + + """ job = self._memcache.get(job_id, None) if job: logging.debug("Found job %s in memcache", job_id) @@ -586,7 +891,7 @@ class JobQueue(object): else: # non-archived case logging.exception("Can't parse job %s, will archive.", job_id) - self._RenameFileUnlocked(filepath, new_path) + self._RenameFilesUnlocked([(filepath, new_path)]) return None self._memcache[job_id] = job @@ -594,6 +899,15 @@ class JobQueue(object): return job def _GetJobsUnlocked(self, job_ids): + """Return a list of jobs based on their IDs. + + @type job_ids: list + @param job_ids: either an empty list (meaning all jobs), + or a list of job IDs + @rtype: list + @return: the list of job objects + + """ if not job_ids: job_ids = self._GetJobIDsUnlocked() @@ -606,6 +920,9 @@ class JobQueue(object): This currently uses the queue drain file, which makes it a per-node flag. In the future this can be moved to the config file. + @rtype: boolean + @return: True of the job queue is marked for draining + """ return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) @@ -616,6 +933,9 @@ class JobQueue(object): This is similar to the function L{backend.JobQueueSetDrainFlag}, and in the future we might merge them. + @type drain_flag: boolean + @param drain_flag: wheter to set or unset the drain flag + """ if drain_flag: utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True) @@ -623,9 +943,8 @@ class JobQueue(object): utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) return True - @utils.LockedMethod @_RequireOpenQueue - def SubmitJob(self, ops): + def _SubmitJobUnlocked(self, ops): """Create and store a new job. This enters the job into our job queue and also puts it on the new @@ -633,10 +952,25 @@ class JobQueue(object): @type ops: list @param ops: The list of OpCodes that will become the new job. + @rtype: job ID + @return: the job ID of the newly created job + @raise errors.JobQueueDrainError: if the job is marked for draining """ if self._IsQueueMarkedDrain(): - raise errors.JobQueueDrainError() + raise errors.JobQueueDrainError("Job queue is drained, refusing job") + + # Check job queue size + size = len(self._ListJobFiles()) + if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT: + # TODO: Autoarchive jobs. Make sure it's not done on every job + # submission, though. + #size = ... + pass + + if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: + raise errors.JobQueueFull() + # Get job identifier job_id = self._NewSerialUnlocked() job = _QueuedJob(self, job_id, ops) @@ -652,8 +986,49 @@ class JobQueue(object): return job.id + @utils.LockedMethod + @_RequireOpenQueue + def SubmitJob(self, ops): + """Create and store a new job. + + @see: L{_SubmitJobUnlocked} + + """ + return self._SubmitJobUnlocked(ops) + + @utils.LockedMethod + @_RequireOpenQueue + def SubmitManyJobs(self, jobs): + """Create and store multiple jobs. + + @see: L{_SubmitJobUnlocked} + + """ + results = [] + for ops in jobs: + try: + data = self._SubmitJobUnlocked(ops) + status = True + except errors.GenericError, err: + data = str(err) + status = False + results.append((status, data)) + + return results + + @_RequireOpenQueue def UpdateJobUnlocked(self, job): + """Update a job's on disk storage. + + After a job has been modified, this function needs to be called in + order to write the changes to disk and replicate them to the other + nodes. + + @type job: L{_QueuedJob} + @param job: the changed job + + """ filename = self._GetJobPath(job.id) data = serializer.DumpJson(job.Serialize(), indent=False) logging.debug("Writing job %s to %s", job.id, filename) @@ -678,6 +1053,14 @@ class JobQueue(object): @param prev_log_serial: Last job message serial number @type timeout: float @param timeout: maximum time to wait + @rtype: tuple (job info, log entries) + @return: a tuple of the job information as required via + the fields parameter, and the log entries as a list + + if the job has not changed and the timeout has expired, + we instead return a special value, + L{constants.JOB_NOTCHANGED}, which should be interpreted + as such by the clients """ logging.debug("Waiting for changes in job %s", job_id) @@ -729,70 +1112,109 @@ class JobQueue(object): def CancelJob(self, job_id): """Cancels a job. + This will only succeed if the job has not started yet. + @type job_id: string - @param job_id: Job ID of job to be cancelled. + @param job_id: job ID of job to be cancelled. """ - logging.debug("Cancelling job %s", job_id) + logging.info("Cancelling job %s", job_id) job = self._LoadJobUnlocked(job_id) if not job: logging.debug("Job %s not found", job_id) - return + return (False, "Job %s not found" % job_id) + + job_status = job.CalcStatus() - if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,): + if job_status not in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_WAITLOCK): logging.debug("Job %s is no longer in the queue", job.id) - return + return (False, "Job %s is no longer in the queue" % job.id) + + if job_status == constants.JOB_STATUS_QUEUED: + self.CancelJobUnlocked(job) + return (True, "Job %s canceled" % job.id) + + elif job_status == constants.JOB_STATUS_WAITLOCK: + # The worker will notice the new status and cancel the job + try: + for op in job.ops: + op.status = constants.OP_STATUS_CANCELING + finally: + self.UpdateJobUnlocked(job) + return (True, "Job %s will be canceled" % job.id) + + @_RequireOpenQueue + def CancelJobUnlocked(self, job): + """Marks a job as canceled. + """ try: for op in job.ops: - op.status = constants.OP_STATUS_ERROR - op.result = "Job cancelled by request" + op.status = constants.OP_STATUS_CANCELED + op.result = "Job canceled by request" finally: self.UpdateJobUnlocked(job) @_RequireOpenQueue - def _ArchiveJobUnlocked(self, job_id): - """Archives a job. + def _ArchiveJobsUnlocked(self, jobs): + """Archives jobs. - @type job_id: string - @param job_id: Job ID of job to be archived. + @type jobs: list of L{_QueuedJob} + @param jobs: Job objects + @rtype: int + @return: Number of archived jobs """ - logging.info("Archiving job %s", job_id) + archive_jobs = [] + rename_files = [] + for job in jobs: + if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, + constants.JOB_STATUS_SUCCESS, + constants.JOB_STATUS_ERROR): + logging.debug("Job %s is not yet done", job.id) + continue - job = self._LoadJobUnlocked(job_id) - if not job: - logging.debug("Job %s not found", job_id) - return + archive_jobs.append(job) - if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, - constants.JOB_STATUS_SUCCESS, - constants.JOB_STATUS_ERROR): - logging.debug("Job %s is not yet done", job.id) - return + old = self._GetJobPath(job.id) + new = self._GetArchivedJobPath(job.id) + rename_files.append((old, new)) - old = self._GetJobPath(job.id) - new = self._GetArchivedJobPath(job.id) + # TODO: What if 1..n files fail to rename? + self._RenameFilesUnlocked(rename_files) - self._RenameFileUnlocked(old, new) + logging.debug("Successfully archived job(s) %s", + ", ".join(job.id for job in archive_jobs)) - logging.debug("Successfully archived job %s", job.id) + return len(archive_jobs) @utils.LockedMethod @_RequireOpenQueue def ArchiveJob(self, job_id): """Archives a job. + This is just a wrapper over L{_ArchiveJobsUnlocked}. + @type job_id: string @param job_id: Job ID of job to be archived. + @rtype: bool + @return: Whether job was archived """ - return self._ArchiveJobUnlocked(job_id) + logging.info("Archiving job %s", job_id) + + job = self._LoadJobUnlocked(job_id) + if not job: + logging.debug("Job %s not found", job_id) + return False + + return self._ArchiveJobsUnlocked([job]) == 1 @utils.LockedMethod @_RequireOpenQueue - def AutoArchiveJobs(self, age): + def AutoArchiveJobs(self, age, timeout): """Archives all jobs based on age. The method will archive all jobs which are older than the age @@ -807,24 +1229,58 @@ class JobQueue(object): logging.info("Archiving jobs with age more than %s seconds", age) now = time.time() - for jid in self._GetJobIDsUnlocked(archived=False): - job = self._LoadJobUnlocked(jid) - if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS, - constants.OP_STATUS_ERROR, - constants.OP_STATUS_CANCELED): - continue - if job.end_timestamp is None: - if job.start_timestamp is None: - job_age = job.received_timestamp + end_time = now + timeout + archived_count = 0 + last_touched = 0 + + all_job_ids = self._GetJobIDsUnlocked(archived=False) + pending = [] + for idx, job_id in enumerate(all_job_ids): + last_touched = idx + + # Not optimal because jobs could be pending + # TODO: Measure average duration for job archival and take number of + # pending jobs into account. + if time.time() > end_time: + break + + # Returns None if the job failed to load + job = self._LoadJobUnlocked(job_id) + if job: + if job.end_timestamp is None: + if job.start_timestamp is None: + job_age = job.received_timestamp + else: + job_age = job.start_timestamp else: - job_age = job.start_timestamp - else: - job_age = job.end_timestamp + job_age = job.end_timestamp + + if age == -1 or now - job_age[0] > age: + pending.append(job) + + # Archive 10 jobs at a time + if len(pending) >= 10: + archived_count += self._ArchiveJobsUnlocked(pending) + pending = [] + + if pending: + archived_count += self._ArchiveJobsUnlocked(pending) - if age == -1 or now - job_age[0] > age: - self._ArchiveJobUnlocked(jid) + return (archived_count, len(all_job_ids) - last_touched - 1) def _GetJobInfoUnlocked(self, job, fields): + """Returns information about a job. + + @type job: L{_QueuedJob} + @param job: the job which we query + @type fields: list + @param fields: names of fields to return + @rtype: list + @return: list with one element for each field + @raise errors.OpExecError: when an invalid field + has been passed + + """ row = [] for fname in fields: if fname == "id": @@ -860,9 +1316,16 @@ class JobQueue(object): def QueryJobs(self, job_ids, fields): """Returns a list of jobs in queue. - Args: - - job_ids: Sequence of job identifiers or None for all - - fields: Names of fields to return + This is a wrapper of L{_GetJobsUnlocked}, which actually does the + processing for each job. + + @type job_ids: list + @param job_ids: sequence of job identifiers or None for all + @type fields: list + @param fields: names of fields to return + @rtype: list + @return: list one element per job, each element being list with + the requested fields """ jobs = [] @@ -880,6 +1343,8 @@ class JobQueue(object): def Shutdown(self): """Stops the job queue. + This shutdowns all the worker threads an closes the queue. + """ self._wpool.TerminateWorkers()