projects
/
ganeti-local
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Add reset_env option to RunCmd
[ganeti-local]
/
lib
/
jqueue.py
diff --git
a/lib/jqueue.py
b/lib/jqueue.py
index
25f464e
..
7824d3f
100644
(file)
--- a/
lib/jqueue.py
+++ b/
lib/jqueue.py
@@
-190,6
+190,13
@@
class _QueuedJob(object):
# Condition to wait for changes
self.change = threading.Condition(self.queue._lock)
# Condition to wait for changes
self.change = threading.Condition(self.queue._lock)
+ def __repr__(self):
+ status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+ "id=%s" % self.id,
+ "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
+
+ return "<%s at %#x>" % (" ".join(status), id(self))
+
@classmethod
def Restore(cls, queue, state):
"""Restore a _QueuedJob from serialized state:
@classmethod
def Restore(cls, queue, state):
"""Restore a _QueuedJob from serialized state:
@@
-430,8
+437,7
@@
class _JobQueueWorker(workerpool.BaseWorker):
@param job: the job to be processed
"""
@param job: the job to be processed
"""
- logging.info("Worker %s processing job %s",
- self.worker_id, job.id)
+ logging.info("Processing job %s", job.id)
proc = mcpu.Processor(self.pool.queue.context, job.id)
queue = job.queue
try:
proc = mcpu.Processor(self.pool.queue.context, job.id)
queue = job.queue
try:
@@
-527,8
+533,7
@@
class _JobQueueWorker(workerpool.BaseWorker):
finally:
queue.release()
finally:
queue.release()
- logging.info("Worker %s finished job %s, status = %s",
- self.worker_id, job_id, status)
+ logging.info("Finished job %s, status = %s", job_id, status)
class _JobQueueWorkerPool(workerpool.WorkerPool):
class _JobQueueWorkerPool(workerpool.WorkerPool):
@@
-536,7
+541,8
@@
class _JobQueueWorkerPool(workerpool.WorkerPool):
"""
def __init__(self, queue):
"""
def __init__(self, queue):
- super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
+ super(_JobQueueWorkerPool, self).__init__("JobQueue",
+ JOBQUEUE_THREADS,
_JobQueueWorker)
self.queue = queue
_JobQueueWorker)
self.queue = queue
@@
-725,7
+731,8
@@
class JobQueue(object):
except KeyError:
pass
except KeyError:
pass
- def _CheckRpcResult(self, result, nodes, failmsg):
+ @staticmethod
+ def _CheckRpcResult(result, nodes, failmsg):
"""Verifies the status of an RPC call.
Since we aim to keep consistency should this node (the current
"""Verifies the status of an RPC call.
Since we aim to keep consistency should this node (the current
@@
-746,8
+753,8
@@
class JobQueue(object):
msg = result[node].fail_msg
if msg:
failed.append(node)
msg = result[node].fail_msg
if msg:
failed.append(node)
- logging.error("RPC call %s failed on node %s: %s",
- result[node].call, node, msg)
+ logging.error("RPC call %s (%s) failed on node %s: %s",
+ result[node].call, failmsg, node, msg)
else:
success.append(node)
else:
success.append(node)
@@
-806,7
+813,8
@@
class JobQueue(object):
result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
- def _FormatJobID(self, job_id):
+ @staticmethod
+ def _FormatJobID(job_id):
"""Convert a job ID to string format.
Currently this just does C{str(job_id)} after performing some
"""Convert a job ID to string format.
Currently this just does C{str(job_id)} after performing some
@@
-921,6
+929,7
@@
class JobQueue(object):
@return: the list of job IDs
"""
@return: the list of job IDs
"""
+ # pylint: disable-msg=W0613
jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
jlist = utils.NiceSort(jlist)
return jlist
jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
jlist = utils.NiceSort(jlist)
return jlist
@@
-1312,7
+1321,7
@@
class JobQueue(object):
all_job_ids = self._GetJobIDsUnlocked(archived=False)
pending = []
for idx, job_id in enumerate(all_job_ids):
all_job_ids = self._GetJobIDsUnlocked(archived=False)
pending = []
for idx, job_id in enumerate(all_job_ids):
- last_touched = idx
+ last_touched = idx + 1
# Not optimal because jobs could be pending
# TODO: Measure average duration for job archival and take number of
# Not optimal because jobs could be pending
# TODO: Measure average duration for job archival and take number of
@@
-1342,9
+1351,10
@@
class JobQueue(object):
if pending:
archived_count += self._ArchiveJobsUnlocked(pending)
if pending:
archived_count += self._ArchiveJobsUnlocked(pending)
- return (archived_count, len(all_job_ids) - last_touched - 1)
+ return (archived_count, len(all_job_ids) - last_touched)
- def _GetJobInfoUnlocked(self, job, fields):
+ @staticmethod
+ def _GetJobInfoUnlocked(job, fields):
"""Returns information about a job.
@type job: L{_QueuedJob}
"""Returns information about a job.
@type job: L{_QueuedJob}