# Condition to wait for changes
self.change = threading.Condition(self.queue._lock)
+ def __repr__(self):
+ status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+ "id=%s" % self.id,
+ "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
+
+ return "<%s at %#x>" % (" ".join(status), id(self))
+
@classmethod
def Restore(cls, queue, state):
"""Restore a _QueuedJob from serialized state:
@param job: the job to be processed
"""
- logging.info("Worker %s processing job %s",
- self.worker_id, job.id)
+ logging.info("Processing job %s", job.id)
proc = mcpu.Processor(self.pool.queue.context, job.id)
queue = job.queue
try:
finally:
queue.release()
- logging.info("Worker %s finished job %s, status = %s",
- self.worker_id, job_id, status)
+ logging.info("Finished job %s, status = %s", job_id, status)
class _JobQueueWorkerPool(workerpool.WorkerPool):
"""
def __init__(self, queue):
- super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
+ super(_JobQueueWorkerPool, self).__init__("JobQueue",
+ JOBQUEUE_THREADS,
_JobQueueWorker)
self.queue = queue
msg = result[node].fail_msg
if msg:
failed.append(node)
- logging.error("RPC call %s failed on node %s: %s",
- result[node].call, node, msg)
+ logging.error("RPC call %s (%s) failed on node %s: %s",
+ result[node].call, failmsg, node, msg)
else:
success.append(node)
@return: the path to the job file
"""
- return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
+ return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
@classmethod
def _GetArchivedJobPath(cls, job_id):
@return: the path to the archived job file
"""
- path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
- return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
+ return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
+ cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
@classmethod
def _ExtractJobID(cls, name):
@return: the list of job IDs
"""
+ # pylint: disable-msg=W0613
jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
jlist = utils.NiceSort(jlist)
return jlist
all_job_ids = self._GetJobIDsUnlocked(archived=False)
pending = []
for idx, job_id in enumerate(all_job_ids):
- last_touched = idx
+ last_touched = idx + 1
# Not optimal because jobs could be pending
# TODO: Measure average duration for job archival and take number of
if pending:
archived_count += self._ArchiveJobsUnlocked(pending)
- return (archived_count, len(all_job_ids) - last_touched - 1)
+ return (archived_count, len(all_job_ids) - last_touched)
@staticmethod
def _GetJobInfoUnlocked(job, fields):