Revision 6bcb1446 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
1140 | 1140 |
as such by the clients |
1141 | 1141 |
|
1142 | 1142 |
""" |
1143 |
logging.debug("Waiting for changes in job %s", job_id) |
|
1144 |
|
|
1145 |
job_info = None |
|
1146 |
log_entries = None |
|
1147 |
|
|
1148 |
end_time = time.time() + timeout |
|
1149 |
while True: |
|
1150 |
delta_time = end_time - time.time() |
|
1151 |
if delta_time < 0: |
|
1152 |
return constants.JOB_NOTCHANGED |
|
1143 |
job = self._LoadJobUnlocked(job_id) |
|
1144 |
if not job: |
|
1145 |
logging.debug("Job %s not found", job_id) |
|
1146 |
return None |
|
1153 | 1147 |
|
1154 |
job = self._LoadJobUnlocked(job_id) |
|
1155 |
if not job: |
|
1156 |
logging.debug("Job %s not found", job_id) |
|
1157 |
break |
|
1148 |
def _CheckForChanges(): |
|
1149 |
logging.debug("Waiting for changes in job %s", job_id) |
|
1158 | 1150 |
|
1159 | 1151 |
status = job.CalcStatus() |
1160 | 1152 |
job_info = self._GetJobInfoUnlocked(job, fields) |
... | ... | |
1168 | 1160 |
job_info = serializer.LoadJson(serializer.DumpJson(job_info)) |
1169 | 1161 |
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) |
1170 | 1162 |
|
1171 |
if status not in (constants.JOB_STATUS_QUEUED, |
|
1172 |
constants.JOB_STATUS_RUNNING, |
|
1173 |
constants.JOB_STATUS_WAITLOCK): |
|
1174 |
# Don't even try to wait if the job is no longer running, there will be |
|
1175 |
# no changes. |
|
1176 |
break |
|
1177 |
|
|
1178 |
if (prev_job_info != job_info or |
|
1163 |
# Don't even try to wait if the job is no longer running, there will be |
|
1164 |
# no changes. |
|
1165 |
if (status not in (constants.JOB_STATUS_QUEUED, |
|
1166 |
constants.JOB_STATUS_RUNNING, |
|
1167 |
constants.JOB_STATUS_WAITLOCK) or |
|
1168 |
prev_job_info != job_info or |
|
1179 | 1169 |
(log_entries and prev_log_serial != log_entries[0][0])): |
1180 |
break |
|
1181 |
|
|
1182 |
logging.debug("Waiting again") |
|
1170 |
logging.debug("Job %s changed", job_id) |
|
1171 |
return (job_info, log_entries) |
|
1183 | 1172 |
|
1184 |
# Release the queue lock while waiting |
|
1185 |
job.change.wait(delta_time) |
|
1173 |
raise utils.RetryAgain() |
|
1186 | 1174 |
|
1187 |
logging.debug("Job %s changed", job_id)
|
|
1188 |
|
|
1189 |
if job_info is None and log_entries is None:
|
|
1190 |
return None
|
|
1191 |
else:
|
|
1192 |
return (job_info, log_entries)
|
|
1175 |
try:
|
|
1176 |
# Setting wait function to release the queue lock while waiting |
|
1177 |
return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
|
|
1178 |
wait_fn=job.change.wait)
|
|
1179 |
except utils.RetryTimeout:
|
|
1180 |
return constants.JOB_NOTCHANGED
|
|
1193 | 1181 |
|
1194 | 1182 |
@utils.LockedMethod |
1195 | 1183 |
@_RequireOpenQueue |
Also available in: Unified diff