Revision 5c735209
b/daemons/ganeti-masterd | ||
---|---|---|
217 | 217 |
return queue.ArchiveJob(job_id) |
218 | 218 |
|
219 | 219 |
elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: |
220 |
(job_id, fields, prev_job_info, prev_log_serial) = args |
|
220 |
(job_id, fields, prev_job_info, prev_log_serial, timeout) = args
|
|
221 | 221 |
return queue.WaitForJobChanges(job_id, fields, prev_job_info, |
222 |
prev_log_serial) |
|
222 |
prev_log_serial, timeout)
|
|
223 | 223 |
|
224 | 224 |
elif method == luxi.REQ_QUERY_JOBS: |
225 | 225 |
(job_ids, fields) = args |
b/lib/constants.py | ||
---|---|---|
279 | 279 |
|
280 | 280 |
JOB_ID_TEMPLATE = r"\d+" |
281 | 281 |
|
282 |
# unchanged job return |
|
283 |
JOB_NOTCHANGED = "nochange" |
|
284 |
|
|
282 | 285 |
# Job status |
283 | 286 |
JOB_STATUS_QUEUED = "queued" |
284 | 287 |
JOB_STATUS_RUNNING = "running" |
b/lib/jqueue.py | ||
---|---|---|
558 | 558 |
|
559 | 559 |
@utils.LockedMethod |
560 | 560 |
@_RequireOpenQueue |
561 |
def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial): |
|
561 |
def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, |
|
562 |
timeout): |
|
562 | 563 |
"""Waits for changes in a job. |
563 | 564 |
|
564 | 565 |
@type job_id: string |
... | ... | |
569 | 570 |
@param prev_job_info: Last job information returned |
570 | 571 |
@type prev_log_serial: int |
571 | 572 |
@param prev_log_serial: Last job message serial number |
573 |
@type timeout: float |
|
574 |
@param timeout: maximum time to wait |
|
572 | 575 |
|
573 | 576 |
""" |
574 | 577 |
logging.debug("Waiting for changes in job %s", job_id) |
575 |
|
|
578 |
end_time = time.time() + timeout |
|
576 | 579 |
while True: |
580 |
delta_time = end_time - time.time() |
|
581 |
if delta_time < 0: |
|
582 |
return constants.JOB_NOTCHANGED |
|
583 |
|
|
577 | 584 |
job = self._LoadJobUnlocked(job_id) |
578 | 585 |
if not job: |
579 | 586 |
logging.debug("Job %s not found", job_id) |
580 |
new_state = None |
|
581 | 587 |
break |
582 | 588 |
|
583 | 589 |
status = job.CalcStatus() |
... | ... | |
605 | 611 |
logging.debug("Waiting again") |
606 | 612 |
|
607 | 613 |
# Release the queue lock while waiting |
608 |
job.change.wait() |
|
614 |
job.change.wait(delta_time)
|
|
609 | 615 |
|
610 | 616 |
logging.debug("Job %s changed", job_id) |
611 | 617 |
|
b/lib/luxi.py | ||
---|---|---|
290 | 290 |
return self.CallMethod(REQ_ARCHIVE_JOB, job_id) |
291 | 291 |
|
292 | 292 |
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial): |
293 |
return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, |
|
294 |
(job_id, fields, prev_job_info, prev_log_serial)) |
|
293 |
timeout = (DEF_RWTO - 1) / 2 |
|
294 |
while True: |
|
295 |
result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, |
|
296 |
(job_id, fields, prev_job_info, |
|
297 |
prev_log_serial, timeout)) |
|
298 |
if result != constants.JOB_NOTCHANGED: |
|
299 |
break |
|
300 |
return result |
|
295 | 301 |
|
296 | 302 |
def QueryJobs(self, job_ids, fields): |
297 | 303 |
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields)) |
Also available in: Unified diff