X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7e950d31c115e719d301f0fed9ede2977be0341a..bdefe5dd311f72b591a4dcf50e01715930d2f00d:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index ec2f52a..62a5f4a 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -190,6 +190,13 @@ class _QueuedJob(object): # Condition to wait for changes self.change = threading.Condition(self.queue._lock) + def __repr__(self): + status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), + "id=%s" % self.id, + "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] + + return "<%s at %#x>" % (" ".join(status), id(self)) + @classmethod def Restore(cls, queue, state): """Restore a _QueuedJob from serialized state: @@ -430,8 +437,7 @@ class _JobQueueWorker(workerpool.BaseWorker): @param job: the job to be processed """ - logging.info("Worker %s processing job %s", - self.worker_id, job.id) + logging.info("Processing job %s", job.id) proc = mcpu.Processor(self.pool.queue.context, job.id) queue = job.queue try: @@ -527,8 +533,7 @@ class _JobQueueWorker(workerpool.BaseWorker): finally: queue.release() - logging.info("Worker %s finished job %s, status = %s", - self.worker_id, job_id, status) + logging.info("Finished job %s, status = %s", job_id, status) class _JobQueueWorkerPool(workerpool.WorkerPool): @@ -536,7 +541,8 @@ class _JobQueueWorkerPool(workerpool.WorkerPool): """ def __init__(self, queue): - super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS, + super(_JobQueueWorkerPool, self).__init__("JobQueue", + JOBQUEUE_THREADS, _JobQueueWorker) self.queue = queue @@ -747,8 +753,8 @@ class JobQueue(object): msg = result[node].fail_msg if msg: failed.append(node) - logging.error("RPC call %s failed on node %s: %s", - result[node].call, node, msg) + logging.error("RPC call %s (%s) failed on node %s: %s", + result[node].call, failmsg, node, msg) else: success.append(node) @@ -876,7 +882,7 @@ class JobQueue(object): @return: the path to the job file """ - return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id) + return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id) @classmethod def _GetArchivedJobPath(cls, job_id): @@ -888,8 +894,8 @@ class JobQueue(object): @return: the path to the archived job file """ - path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id) - return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path) + return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, + cls._GetArchiveDirectory(job_id), "job-%s" % job_id) @classmethod def _ExtractJobID(cls, name): @@ -923,6 +929,7 @@ class JobQueue(object): @return: the list of job IDs """ + # pylint: disable-msg=W0613 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()] jlist = utils.NiceSort(jlist) return jlist @@ -1314,7 +1321,7 @@ class JobQueue(object): all_job_ids = self._GetJobIDsUnlocked(archived=False) pending = [] for idx, job_id in enumerate(all_job_ids): - last_touched = idx + last_touched = idx + 1 # Not optimal because jobs could be pending # TODO: Measure average duration for job archival and take number of @@ -1344,7 +1351,7 @@ class JobQueue(object): if pending: archived_count += self._ArchiveJobsUnlocked(pending) - return (archived_count, len(all_job_ids) - last_touched - 1) + return (archived_count, len(all_job_ids) - last_touched) @staticmethod def _GetJobInfoUnlocked(job, fields):