Revision daba67c7
b/lib/jqueue.py | ||
---|---|---|
690 | 690 |
@param job: the job to be processed |
691 | 691 |
|
692 | 692 |
""" |
693 |
self.SetTaskName("Job%s" % job.id) |
|
694 |
|
|
693 | 695 |
logging.info("Processing job %s", job.id) |
694 | 696 |
proc = mcpu.Processor(self.pool.queue.context, job.id) |
695 | 697 |
queue = job.queue |
b/lib/workerpool.py | ||
---|---|---|
49 | 49 |
""" |
50 | 50 |
super(BaseWorker, self).__init__(name=worker_id) |
51 | 51 |
self.pool = pool |
52 |
self._worker_id = worker_id |
|
52 | 53 |
self._current_task = None |
53 | 54 |
|
55 |
assert self.getName() == worker_id |
|
56 |
|
|
54 | 57 |
def ShouldTerminate(self): |
55 | 58 |
"""Returns whether this worker should terminate. |
56 | 59 |
|
... | ... | |
64 | 67 |
finally: |
65 | 68 |
self.pool._lock.release() |
66 | 69 |
|
70 |
def SetTaskName(self, taskname): |
|
71 |
"""Sets the name of the current task. |
|
72 |
|
|
73 |
Should only be called from within L{RunTask}. |
|
74 |
|
|
75 |
@type taskname: string |
|
76 |
@param taskname: Task's name |
|
77 |
|
|
78 |
""" |
|
79 |
if taskname: |
|
80 |
name = "%s/%s" % (self._worker_id, taskname) |
|
81 |
else: |
|
82 |
name = self._worker_id |
|
83 |
|
|
84 |
# Set thread name |
|
85 |
self.setName(name) |
|
86 |
|
|
67 | 87 |
def _HasRunningTaskUnlocked(self): |
68 | 88 |
"""Returns whether this worker is currently running a task. |
69 | 89 |
|
... | ... | |
107 | 127 |
# Run the actual task |
108 | 128 |
try: |
109 | 129 |
logging.debug("Starting task %r", self._current_task) |
110 |
self.RunTask(*self._current_task) |
|
130 |
assert self.getName() == self._worker_id |
|
131 |
try: |
|
132 |
self.RunTask(*self._current_task) |
|
133 |
finally: |
|
134 |
self.SetTaskName(None) |
|
111 | 135 |
logging.debug("Done with task %r", self._current_task) |
112 | 136 |
except: # pylint: disable-msg=W0702 |
113 | 137 |
logging.exception("Caught unhandled exception") |
b/test/ganeti.workerpool_unittest.py | ||
---|---|---|
76 | 76 |
|
77 | 77 |
class ChecksumBaseWorker(workerpool.BaseWorker): |
78 | 78 |
def RunTask(self, ctx, number): |
79 |
name = "number%s" % number |
|
80 |
self.SetTaskName(name) |
|
81 |
|
|
82 |
# This assertion needs to be checked before updating the checksum. A |
|
83 |
# failing assertion will then cause the result to be wrong. |
|
84 |
assert self.getName() == ("%s/%s" % (self._worker_id, name)) |
|
85 |
|
|
79 | 86 |
ctx.lock.acquire() |
80 | 87 |
try: |
81 | 88 |
ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number) |
Also available in: Unified diff