Revision d7fd1f28
b/lib/jqueue.py | ||
---|---|---|
703 | 703 |
self._CheckRpcResult(result, self._nodes, |
704 | 704 |
"Updating %s" % file_name) |
705 | 705 |
|
706 |
def _RenameFileUnlocked(self, old, new):
|
|
706 |
def _RenameFilesUnlocked(self, rename):
|
|
707 | 707 |
"""Renames a file locally and then replicate the change. |
708 | 708 |
|
709 | 709 |
This function will rename a file in the local queue directory |
710 | 710 |
and then replicate this rename to all the other nodes we have. |
711 | 711 |
|
712 |
@type old: str |
|
713 |
@param old: the current name of the file |
|
714 |
@type new: str |
|
715 |
@param new: the new name of the file |
|
712 |
@type rename: list of (old, new) |
|
713 |
@param rename: List containing tuples mapping old to new names |
|
716 | 714 |
|
717 | 715 |
""" |
718 |
utils.RenameFile(old, new, mkdir=True) |
|
716 |
for old, new in rename: |
|
717 |
utils.RenameFile(old, new, mkdir=True) |
|
719 | 718 |
|
720 |
names, addrs = self._GetNodeIp() |
|
721 |
result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new) |
|
722 |
self._CheckRpcResult(result, self._nodes, |
|
723 |
"Moving %s to %s" % (old, new)) |
|
719 |
names, addrs = self._GetNodeIp()
|
|
720 |
result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
|
|
721 |
self._CheckRpcResult(result, self._nodes,
|
|
722 |
"Moving %s to %s" % (old, new))
|
|
724 | 723 |
|
725 | 724 |
def _FormatJobID(self, job_id): |
726 | 725 |
"""Convert a job ID to string format. |
... | ... | |
886 | 885 |
else: |
887 | 886 |
# non-archived case |
888 | 887 |
logging.exception("Can't parse job %s, will archive.", job_id) |
889 |
self._RenameFileUnlocked(filepath, new_path)
|
|
888 |
self._RenameFilesUnlocked([(filepath, new_path)])
|
|
890 | 889 |
return None |
891 | 890 |
|
892 | 891 |
self._memcache[job_id] = job |
... | ... | |
1123 | 1122 |
self.UpdateJobUnlocked(job) |
1124 | 1123 |
|
1125 | 1124 |
@_RequireOpenQueue |
1126 |
def _ArchiveJobUnlocked(self, job):
|
|
1127 |
"""Archives a job.
|
|
1125 |
def _ArchiveJobsUnlocked(self, jobs):
|
|
1126 |
"""Archives jobs.
|
|
1128 | 1127 |
|
1129 |
@type job: L{_QueuedJob}
|
|
1130 |
@param job: Job object |
|
1131 |
@rtype bool
|
|
1132 |
@return Whether job was archived
|
|
1128 |
@type jobs: list of L{_QueuedJob}
|
|
1129 |
@param job: Job objects
|
|
1130 |
@rtype: int
|
|
1131 |
@return: Number of archived jobs
|
|
1133 | 1132 |
|
1134 | 1133 |
""" |
1135 |
if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, |
|
1136 |
constants.JOB_STATUS_SUCCESS, |
|
1137 |
constants.JOB_STATUS_ERROR): |
|
1138 |
logging.debug("Job %s is not yet done", job.id) |
|
1139 |
return False |
|
1134 |
archive_jobs = [] |
|
1135 |
rename_files = [] |
|
1136 |
for job in jobs: |
|
1137 |
if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, |
|
1138 |
constants.JOB_STATUS_SUCCESS, |
|
1139 |
constants.JOB_STATUS_ERROR): |
|
1140 |
logging.debug("Job %s is not yet done", job.id) |
|
1141 |
continue |
|
1140 | 1142 |
|
1141 |
old = self._GetJobPath(job.id) |
|
1142 |
new = self._GetArchivedJobPath(job.id) |
|
1143 |
archive_jobs.append(job) |
|
1143 | 1144 |
|
1144 |
self._RenameFileUnlocked(old, new) |
|
1145 |
old = self._GetJobPath(job.id) |
|
1146 |
new = self._GetArchivedJobPath(job.id) |
|
1147 |
rename_files.append((old, new)) |
|
1145 | 1148 |
|
1146 |
logging.debug("Successfully archived job %s", job.id) |
|
1149 |
# TODO: What if 1..n files fail to rename? |
|
1150 |
self._RenameFilesUnlocked(rename_files) |
|
1147 | 1151 |
|
1148 |
return True |
|
1152 |
logging.debug("Successfully archived job(s) %s", |
|
1153 |
", ".join(job.id for job in archive_jobs)) |
|
1154 |
|
|
1155 |
return len(archive_jobs) |
|
1149 | 1156 |
|
1150 | 1157 |
@utils.LockedMethod |
1151 | 1158 |
@_RequireOpenQueue |
... | ... | |
1167 | 1174 |
logging.debug("Job %s not found", job_id) |
1168 | 1175 |
return False |
1169 | 1176 |
|
1170 |
return self._ArchiveJobUnlocked(job)
|
|
1177 |
return self._ArchiveJobUnlocked([job]) == 1
|
|
1171 | 1178 |
|
1172 | 1179 |
@utils.LockedMethod |
1173 | 1180 |
@_RequireOpenQueue |
... | ... | |
1191 | 1198 |
last_touched = 0 |
1192 | 1199 |
|
1193 | 1200 |
all_job_ids = self._GetJobIDsUnlocked(archived=False) |
1201 |
pending = [] |
|
1194 | 1202 |
for idx, job_id in enumerate(all_job_ids): |
1195 | 1203 |
last_touched = idx |
1196 | 1204 |
|
1205 |
# Not optimal because jobs could be pending |
|
1206 |
# TODO: Measure average duration for job archival and take number of |
|
1207 |
# pending jobs into account. |
|
1197 | 1208 |
if time.time() > end_time: |
1198 | 1209 |
break |
1199 | 1210 |
|
... | ... | |
1209 | 1220 |
job_age = job.end_timestamp |
1210 | 1221 |
|
1211 | 1222 |
if age == -1 or now - job_age[0] > age: |
1212 |
archived = self._ArchiveJobUnlocked(job) |
|
1213 |
if archived: |
|
1214 |
archived_count += 1 |
|
1215 |
continue |
|
1223 |
pending.append(job) |
|
1224 |
|
|
1225 |
# Archive 10 jobs at a time |
|
1226 |
if len(pending) >= 10: |
|
1227 |
archived_count += self._ArchiveJobsUnlocked(pending) |
|
1228 |
pending = [] |
|
1216 | 1229 |
|
1230 |
if pending: |
|
1231 |
archived_count += self._ArchiveJobsUnlocked(pending) |
|
1217 | 1232 |
|
1218 | 1233 |
return (archived_count, len(all_job_ids) - last_touched - 1) |
1219 | 1234 |
|
Also available in: Unified diff