Revision e92376d7 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
159 | 159 |
|
160 | 160 |
if op.status == constants.OP_STATUS_QUEUED: |
161 | 161 |
pass |
162 |
elif op.status == constants.OP_STATUS_WAITLOCK: |
|
163 |
status = constants.JOB_STATUS_WAITLOCK |
|
162 | 164 |
elif op.status == constants.OP_STATUS_RUNNING: |
163 | 165 |
status = constants.JOB_STATUS_RUNNING |
164 | 166 |
elif op.status == constants.OP_STATUS_ERROR: |
... | ... | |
188 | 190 |
|
189 | 191 |
|
190 | 192 |
class _JobQueueWorker(workerpool.BaseWorker): |
193 |
def _NotifyStart(self): |
|
194 |
"""Mark the opcode as running, not lock-waiting. |
|
195 |
|
|
196 |
This is called from the mcpu code as a notifier function, when the |
|
197 |
LU is finally about to start the Exec() method. Of course, to have |
|
198 |
end-user visible results, the opcode must be initially (before |
|
199 |
calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. |
|
200 |
|
|
201 |
""" |
|
202 |
assert self.queue, "Queue attribute is missing" |
|
203 |
assert self.opcode, "Opcode attribute is missing" |
|
204 |
|
|
205 |
self.queue.acquire() |
|
206 |
try: |
|
207 |
self.opcode.status = constants.OP_STATUS_RUNNING |
|
208 |
finally: |
|
209 |
self.queue.release() |
|
210 |
|
|
191 | 211 |
def RunTask(self, job): |
192 | 212 |
"""Job executor. |
193 | 213 |
|
... | ... | |
198 | 218 |
logging.debug("Worker %s processing job %s", |
199 | 219 |
self.worker_id, job.id) |
200 | 220 |
proc = mcpu.Processor(self.pool.queue.context) |
201 |
queue = job.queue |
|
221 |
self.queue = queue = job.queue
|
|
202 | 222 |
try: |
203 | 223 |
try: |
204 | 224 |
count = len(job.ops) |
... | ... | |
209 | 229 |
queue.acquire() |
210 | 230 |
try: |
211 | 231 |
job.run_op_index = idx |
212 |
op.status = constants.OP_STATUS_RUNNING
|
|
232 |
op.status = constants.OP_STATUS_WAITLOCK
|
|
213 | 233 |
op.result = None |
214 | 234 |
op.start_timestamp = TimeStampNow() |
215 | 235 |
if idx == 0: # first opcode |
... | ... | |
246 | 266 |
queue.release() |
247 | 267 |
|
248 | 268 |
# Make sure not to hold lock while _Log is called |
249 |
result = proc.ExecOpCode(input_opcode, _Log) |
|
269 |
self.opcode = op |
|
270 |
result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart) |
|
250 | 271 |
|
251 | 272 |
queue.acquire() |
252 | 273 |
try: |
... | ... | |
365 | 386 |
if status in (constants.JOB_STATUS_QUEUED, ): |
366 | 387 |
self._wpool.AddTask(job) |
367 | 388 |
|
368 |
elif status in (constants.JOB_STATUS_RUNNING, ): |
|
389 |
elif status in (constants.JOB_STATUS_RUNNING, |
|
390 |
constants.JOB_STATUS_WAITLOCK): |
|
369 | 391 |
logging.warning("Unfinished job %s found: %s", job.id, job) |
370 | 392 |
try: |
371 | 393 |
for op in job.ops: |
... | ... | |
621 | 643 |
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) |
622 | 644 |
|
623 | 645 |
if status not in (constants.JOB_STATUS_QUEUED, |
624 |
constants.JOB_STATUS_RUNNING): |
|
646 |
constants.JOB_STATUS_RUNNING, |
|
647 |
constants.JOB_STATUS_WAITLOCK): |
|
625 | 648 |
# Don't even try to wait if the job is no longer running, there will be |
626 | 649 |
# no changes. |
627 | 650 |
break |
Also available in: Unified diff