Revision ef2df7d3 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
153 | 153 |
@ivar received_timestamp: the timestamp for when the job was received |
154 | 154 |
@ivar start_timestmap: the timestamp for start of execution |
155 | 155 |
@ivar end_timestamp: the timestamp for end of execution |
156 |
@ivar lock_status: In-memory locking information for debugging |
|
156 | 157 |
@ivar change: a Condition variable we use for waiting for job changes |
157 | 158 |
|
158 | 159 |
""" |
159 | 160 |
__slots__ = ["queue", "id", "ops", "run_op_index", "log_serial", |
160 | 161 |
"received_timestamp", "start_timestamp", "end_timestamp", |
161 |
"change", |
|
162 |
"lock_status", "change",
|
|
162 | 163 |
"__weakref__"] |
163 | 164 |
|
164 | 165 |
def __init__(self, queue, job_id, ops): |
... | ... | |
186 | 187 |
self.start_timestamp = None |
187 | 188 |
self.end_timestamp = None |
188 | 189 |
|
190 |
# In-memory attributes |
|
191 |
self.lock_status = None |
|
192 |
|
|
189 | 193 |
# Condition to wait for changes |
190 | 194 |
self.change = threading.Condition(self.queue._lock) |
191 | 195 |
|
... | ... | |
209 | 213 |
obj.start_timestamp = state.get("start_timestamp", None) |
210 | 214 |
obj.end_timestamp = state.get("end_timestamp", None) |
211 | 215 |
|
216 |
# In-memory attributes |
|
217 |
obj.lock_status = None |
|
218 |
|
|
212 | 219 |
obj.ops = [] |
213 | 220 |
obj.log_serial = 0 |
214 | 221 |
for op_state in state["ops"]: |
... | ... | |
334 | 341 |
not_marked = False |
335 | 342 |
|
336 | 343 |
|
337 |
class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
|
|
344 |
class _OpExecCallbacks(mcpu.OpExecCbBase): |
|
338 | 345 |
def __init__(self, queue, job, op): |
339 | 346 |
"""Initializes this class. |
340 | 347 |
|
... | ... | |
368 | 375 |
assert self._op.status in (constants.OP_STATUS_WAITLOCK, |
369 | 376 |
constants.OP_STATUS_CANCELING) |
370 | 377 |
|
378 |
# All locks are acquired by now |
|
379 |
self._job.lock_status = None |
|
380 |
|
|
371 | 381 |
# Cancel here if we were asked to |
372 | 382 |
if self._op.status == constants.OP_STATUS_CANCELING: |
373 | 383 |
raise CancelJob() |
... | ... | |
401 | 411 |
finally: |
402 | 412 |
self._queue.release() |
403 | 413 |
|
414 |
def ReportLocks(self, msg): |
|
415 |
"""Write locking information to the job. |
|
416 |
|
|
417 |
Called whenever the LU processor is waiting for a lock or has acquired one. |
|
418 |
|
|
419 |
""" |
|
420 |
# Not getting the queue lock because this is a single assignment |
|
421 |
self._job.lock_status = msg |
|
422 |
|
|
404 | 423 |
|
405 | 424 |
class _JobQueueWorker(workerpool.BaseWorker): |
406 | 425 |
"""The actual job workers. |
... | ... | |
457 | 476 |
|
458 | 477 |
# Make sure not to hold queue lock while calling ExecOpCode |
459 | 478 |
result = proc.ExecOpCode(input_opcode, |
460 |
_OpCodeExecCallbacks(queue, job, op))
|
|
479 |
_OpExecCallbacks(queue, job, op)) |
|
461 | 480 |
|
462 | 481 |
queue.acquire() |
463 | 482 |
try: |
... | ... | |
505 | 524 |
queue.acquire() |
506 | 525 |
try: |
507 | 526 |
try: |
527 |
job.lock_status = None |
|
508 | 528 |
job.run_op_index = -1 |
509 | 529 |
job.end_timestamp = TimeStampNow() |
510 | 530 |
queue.UpdateJobUnlocked(job) |
... | ... | |
513 | 533 |
status = job.CalcStatus() |
514 | 534 |
finally: |
515 | 535 |
queue.release() |
536 |
|
|
516 | 537 |
logging.info("Worker %s finished job %s, status = %s", |
517 | 538 |
self.worker_id, job_id, status) |
518 | 539 |
|
... | ... | |
1081 | 1102 |
|
1082 | 1103 |
return results |
1083 | 1104 |
|
1084 |
|
|
1085 | 1105 |
@_RequireOpenQueue |
1086 | 1106 |
def UpdateJobUnlocked(self, job): |
1087 | 1107 |
"""Update a job's on disk storage. |
Also available in: Unified diff