1140 |
1140 |
as such by the clients
|
1141 |
1141 |
|
1142 |
1142 |
"""
|
1143 |
|
logging.debug("Waiting for changes in job %s", job_id)
|
1144 |
|
|
1145 |
|
job_info = None
|
1146 |
|
log_entries = None
|
1147 |
|
|
1148 |
|
end_time = time.time() + timeout
|
1149 |
|
while True:
|
1150 |
|
delta_time = end_time - time.time()
|
1151 |
|
if delta_time < 0:
|
1152 |
|
return constants.JOB_NOTCHANGED
|
|
1143 |
job = self._LoadJobUnlocked(job_id)
|
|
1144 |
if not job:
|
|
1145 |
logging.debug("Job %s not found", job_id)
|
|
1146 |
return None
|
1153 |
1147 |
|
1154 |
|
job = self._LoadJobUnlocked(job_id)
|
1155 |
|
if not job:
|
1156 |
|
logging.debug("Job %s not found", job_id)
|
1157 |
|
break
|
|
1148 |
def _CheckForChanges():
|
|
1149 |
logging.debug("Waiting for changes in job %s", job_id)
|
1158 |
1150 |
|
1159 |
1151 |
status = job.CalcStatus()
|
1160 |
1152 |
job_info = self._GetJobInfoUnlocked(job, fields)
|
... | ... | |
1168 |
1160 |
job_info = serializer.LoadJson(serializer.DumpJson(job_info))
|
1169 |
1161 |
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
|
1170 |
1162 |
|
1171 |
|
if status not in (constants.JOB_STATUS_QUEUED,
|
1172 |
|
constants.JOB_STATUS_RUNNING,
|
1173 |
|
constants.JOB_STATUS_WAITLOCK):
|
1174 |
|
# Don't even try to wait if the job is no longer running, there will be
|
1175 |
|
# no changes.
|
1176 |
|
break
|
1177 |
|
|
1178 |
|
if (prev_job_info != job_info or
|
|
1163 |
# Don't even try to wait if the job is no longer running, there will be
|
|
1164 |
# no changes.
|
|
1165 |
if (status not in (constants.JOB_STATUS_QUEUED,
|
|
1166 |
constants.JOB_STATUS_RUNNING,
|
|
1167 |
constants.JOB_STATUS_WAITLOCK) or
|
|
1168 |
prev_job_info != job_info or
|
1179 |
1169 |
(log_entries and prev_log_serial != log_entries[0][0])):
|
1180 |
|
break
|
1181 |
|
|
1182 |
|
logging.debug("Waiting again")
|
|
1170 |
logging.debug("Job %s changed", job_id)
|
|
1171 |
return (job_info, log_entries)
|
1183 |
1172 |
|
1184 |
|
# Release the queue lock while waiting
|
1185 |
|
job.change.wait(delta_time)
|
|
1173 |
raise utils.RetryAgain()
|
1186 |
1174 |
|
1187 |
|
logging.debug("Job %s changed", job_id)
|
1188 |
|
|
1189 |
|
if job_info is None and log_entries is None:
|
1190 |
|
return None
|
1191 |
|
else:
|
1192 |
|
return (job_info, log_entries)
|
|
1175 |
try:
|
|
1176 |
# Setting wait function to release the queue lock while waiting
|
|
1177 |
return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
|
|
1178 |
wait_fn=job.change.wait)
|
|
1179 |
except utils.RetryTimeout:
|
|
1180 |
return constants.JOB_NOTCHANGED
|
1193 |
1181 |
|
1194 |
1182 |
@utils.LockedMethod
|
1195 |
1183 |
@_RequireOpenQueue
|