X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/1d2dcdfd59298fc8aad16d98ae59beaf8e05a4f6..bdefe5dd311f72b591a4dcf50e01715930d2f00d:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index 23e1d3b..62a5f4a 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -145,9 +145,6 @@ class _QueuedJob(object): @ivar id: the job ID @type ops: list @ivar ops: the list of _QueuedOpCode that constitute the job - @type run_op_index: int - @ivar run_op_index: the currently executing opcode, or -1 if - we didn't yet start executing @type log_serial: int @ivar log_serial: holds the index for the next log entry @ivar received_timestamp: the timestamp for when the job was received @@ -157,7 +154,8 @@ class _QueuedJob(object): @ivar change: a Condition variable we use for waiting for job changes """ - __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial", + # pylint: disable-msg=W0212 + __slots__ = ["queue", "id", "ops", "log_serial", "received_timestamp", "start_timestamp", "end_timestamp", "lock_status", "change", "__weakref__"] @@ -181,7 +179,6 @@ class _QueuedJob(object): self.queue = queue self.id = job_id self.ops = [_QueuedOpCode(op) for op in ops] - self.run_op_index = -1 self.log_serial = 0 self.received_timestamp = TimeStampNow() self.start_timestamp = None @@ -193,6 +190,13 @@ class _QueuedJob(object): # Condition to wait for changes self.change = threading.Condition(self.queue._lock) + def __repr__(self): + status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), + "id=%s" % self.id, + "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] + + return "<%s at %#x>" % (" ".join(status), id(self)) + @classmethod def Restore(cls, queue, state): """Restore a _QueuedJob from serialized state: @@ -208,7 +212,6 @@ class _QueuedJob(object): obj = _QueuedJob.__new__(cls) obj.queue = queue obj.id = state["id"] - obj.run_op_index = state["run_op_index"] obj.received_timestamp = state.get("received_timestamp", None) obj.start_timestamp = state.get("start_timestamp", None) obj.end_timestamp = state.get("end_timestamp", None) @@ -239,7 +242,6 @@ class _QueuedJob(object): return { "id": self.id, "ops": [op.Serialize() for op in self.ops], - "run_op_index": self.run_op_index, "start_timestamp": self.start_timestamp, "end_timestamp": self.end_timestamp, "received_timestamp": self.received_timestamp, @@ -425,7 +427,7 @@ class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. """ - def RunTask(self, job): + def RunTask(self, job): # pylint: disable-msg=W0221 """Job executor. This functions processes a job. It is closely tied to the _QueuedJob and @@ -435,9 +437,8 @@ class _JobQueueWorker(workerpool.BaseWorker): @param job: the job to be processed """ - logging.info("Worker %s processing job %s", - self.worker_id, job.id) - proc = mcpu.Processor(self.pool.queue.context) + logging.info("Processing job %s", job.id) + proc = mcpu.Processor(self.pool.queue.context, job.id) queue = job.queue try: try: @@ -462,7 +463,6 @@ class _JobQueueWorker(workerpool.BaseWorker): if op.status == constants.OP_STATUS_CANCELED: raise CancelJob() assert op.status == constants.OP_STATUS_QUEUED - job.run_op_index = idx op.status = constants.OP_STATUS_WAITLOCK op.result = None op.start_timestamp = TimeStampNow() @@ -525,7 +525,6 @@ class _JobQueueWorker(workerpool.BaseWorker): try: try: job.lock_status = None - job.run_op_index = -1 job.end_timestamp = TimeStampNow() queue.UpdateJobUnlocked(job) finally: @@ -534,8 +533,7 @@ class _JobQueueWorker(workerpool.BaseWorker): finally: queue.release() - logging.info("Worker %s finished job %s, status = %s", - self.worker_id, job_id, status) + logging.info("Finished job %s, status = %s", job_id, status) class _JobQueueWorkerPool(workerpool.WorkerPool): @@ -543,38 +541,44 @@ class _JobQueueWorkerPool(workerpool.WorkerPool): """ def __init__(self, queue): - super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS, + super(_JobQueueWorkerPool, self).__init__("JobQueue", + JOBQUEUE_THREADS, _JobQueueWorker) self.queue = queue -class JobQueue(object): - """Queue used to manage the jobs. +def _RequireOpenQueue(fn): + """Decorator for "public" functions. - @cvar _RE_JOB_FILE: regex matching the valid job file names + This function should be used for all 'public' functions. That is, + functions usually called from other classes. Note that this should + be applied only to methods (not plain functions), since it expects + that the decorated function is called with a first argument that has + a '_queue_lock' argument. - """ - _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) + @warning: Use this decorator only after utils.LockedMethod! + + Example:: + @utils.LockedMethod + @_RequireOpenQueue + def Example(self): + pass - def _RequireOpenQueue(fn): - """Decorator for "public" functions. + """ + def wrapper(self, *args, **kwargs): + # pylint: disable-msg=W0212 + assert self._queue_lock is not None, "Queue should be open" + return fn(self, *args, **kwargs) + return wrapper - This function should be used for all 'public' functions. That is, - functions usually called from other classes. - @warning: Use this decorator only after utils.LockedMethod! +class JobQueue(object): + """Queue used to manage the jobs. - Example:: - @utils.LockedMethod - @_RequireOpenQueue - def Example(self): - pass + @cvar _RE_JOB_FILE: regex matching the valid job file names - """ - def wrapper(self, *args, **kwargs): - assert self._queue_lock is not None, "Queue should be open" - return fn(self, *args, **kwargs) - return wrapper + """ + _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) def __init__(self, context): """Constructor for JobQueue. @@ -681,7 +685,7 @@ class JobQueue(object): # Clean queue directory on added node result = rpc.RpcRunner.call_jobqueue_purge(node_name) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: logging.warning("Cannot cleanup queue directory on node %s: %s", node_name, msg) @@ -705,7 +709,7 @@ class JobQueue(object): result = rpc.RpcRunner.call_jobqueue_update([node_name], [node.primary_ip], file_name, content) - msg = result[node_name].RemoteFailMsg() + msg = result[node_name].fail_msg if msg: logging.error("Failed to upload file %s to node %s: %s", file_name, node_name, msg) @@ -727,7 +731,8 @@ class JobQueue(object): except KeyError: pass - def _CheckRpcResult(self, result, nodes, failmsg): + @staticmethod + def _CheckRpcResult(result, nodes, failmsg): """Verifies the status of an RPC call. Since we aim to keep consistency should this node (the current @@ -745,11 +750,11 @@ class JobQueue(object): success = [] for node in nodes: - msg = result[node].RemoteFailMsg() + msg = result[node].fail_msg if msg: failed.append(node) - logging.error("RPC call %s failed on node %s: %s", - result[node].call, node, msg) + logging.error("RPC call %s (%s) failed on node %s: %s", + result[node].call, failmsg, node, msg) else: success.append(node) @@ -808,7 +813,8 @@ class JobQueue(object): result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) - def _FormatJobID(self, job_id): + @staticmethod + def _FormatJobID(job_id): """Convert a job ID to string format. Currently this just does C{str(job_id)} after performing some @@ -876,7 +882,7 @@ class JobQueue(object): @return: the path to the job file """ - return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id) + return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id) @classmethod def _GetArchivedJobPath(cls, job_id): @@ -888,8 +894,8 @@ class JobQueue(object): @return: the path to the archived job file """ - path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id) - return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path) + return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, + cls._GetArchiveDirectory(job_id), "job-%s" % job_id) @classmethod def _ExtractJobID(cls, name): @@ -923,6 +929,7 @@ class JobQueue(object): @return: the list of job IDs """ + # pylint: disable-msg=W0613 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()] jlist = utils.NiceSort(jlist) return jlist @@ -967,7 +974,7 @@ class JobQueue(object): try: job = _QueuedJob.Restore(self, data) - except Exception, err: + except Exception, err: # pylint: disable-msg=W0703 new_path = self._GetArchivedJobPath(job_id) if filepath == new_path: # job already archived (future case) @@ -1035,7 +1042,7 @@ class JobQueue(object): queue, in order for it to be picked up by the queue processors. @type job_id: job ID - @param jod_id: the job ID for the new job + @param job_id: the job ID for the new job @type ops: list @param ops: The list of OpCodes that will become the new job. @rtype: job ID @@ -1148,21 +1155,13 @@ class JobQueue(object): as such by the clients """ - logging.debug("Waiting for changes in job %s", job_id) - - job_info = None - log_entries = None - - end_time = time.time() + timeout - while True: - delta_time = end_time - time.time() - if delta_time < 0: - return constants.JOB_NOTCHANGED + job = self._LoadJobUnlocked(job_id) + if not job: + logging.debug("Job %s not found", job_id) + return None - job = self._LoadJobUnlocked(job_id) - if not job: - logging.debug("Job %s not found", job_id) - break + def _CheckForChanges(): + logging.debug("Waiting for changes in job %s", job_id) status = job.CalcStatus() job_info = self._GetJobInfoUnlocked(job, fields) @@ -1176,28 +1175,24 @@ class JobQueue(object): job_info = serializer.LoadJson(serializer.DumpJson(job_info)) log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) - if status not in (constants.JOB_STATUS_QUEUED, - constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK): - # Don't even try to wait if the job is no longer running, there will be - # no changes. - break - - if (prev_job_info != job_info or + # Don't even try to wait if the job is no longer running, there will be + # no changes. + if (status not in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK) or + prev_job_info != job_info or (log_entries and prev_log_serial != log_entries[0][0])): - break - - logging.debug("Waiting again") - - # Release the queue lock while waiting - job.change.wait(delta_time) + logging.debug("Job %s changed", job_id) + return (job_info, log_entries) - logging.debug("Job %s changed", job_id) + raise utils.RetryAgain() - if job_info is None and log_entries is None: - return None - else: - return (job_info, log_entries) + try: + # Setting wait function to release the queue lock while waiting + return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout, + wait_fn=job.change.wait) + except utils.RetryTimeout: + return constants.JOB_NOTCHANGED @utils.LockedMethod @_RequireOpenQueue @@ -1276,7 +1271,7 @@ class JobQueue(object): self._RenameFilesUnlocked(rename_files) logging.debug("Successfully archived job(s) %s", - ", ".join(job.id for job in archive_jobs)) + utils.CommaJoin(job.id for job in archive_jobs)) return len(archive_jobs) @@ -1326,7 +1321,7 @@ class JobQueue(object): all_job_ids = self._GetJobIDsUnlocked(archived=False) pending = [] for idx, job_id in enumerate(all_job_ids): - last_touched = idx + last_touched = idx + 1 # Not optimal because jobs could be pending # TODO: Measure average duration for job archival and take number of @@ -1356,9 +1351,10 @@ class JobQueue(object): if pending: archived_count += self._ArchiveJobsUnlocked(pending) - return (archived_count, len(all_job_ids) - last_touched - 1) + return (archived_count, len(all_job_ids) - last_touched) - def _GetJobInfoUnlocked(self, job, fields): + @staticmethod + def _GetJobInfoUnlocked(job, fields): """Returns information about a job. @type job: L{_QueuedJob}