Revision b95479a5 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
1 | 1 |
# |
2 | 2 |
# |
3 | 3 |
|
4 |
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. |
|
4 |
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
|
|
5 | 5 |
# |
6 | 6 |
# This program is free software; you can redistribute it and/or modify |
7 | 7 |
# it under the terms of the GNU General Public License as published by |
... | ... | |
34 | 34 |
import re |
35 | 35 |
import time |
36 | 36 |
import weakref |
37 |
import threading |
|
37 | 38 |
|
38 | 39 |
try: |
39 | 40 |
# pylint: disable-msg=E0611 |
... | ... | |
55 | 56 |
from ganeti import runtime |
56 | 57 |
from ganeti import netutils |
57 | 58 |
from ganeti import compat |
59 |
from ganeti import ht |
|
58 | 60 |
|
59 | 61 |
|
60 | 62 |
JOBQUEUE_THREADS = 25 |
... | ... | |
177 | 179 |
# pylint: disable-msg=W0212 |
178 | 180 |
__slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", |
179 | 181 |
"received_timestamp", "start_timestamp", "end_timestamp", |
180 |
"__weakref__"] |
|
182 |
"__weakref__", "processor_lock"]
|
|
181 | 183 |
|
182 | 184 |
def __init__(self, queue, job_id, ops): |
183 | 185 |
"""Constructor for the _QueuedJob. |
... | ... | |
211 | 213 |
""" |
212 | 214 |
obj.ops_iter = None |
213 | 215 |
obj.cur_opctx = None |
216 |
obj.processor_lock = threading.Lock() |
|
214 | 217 |
|
215 | 218 |
def __repr__(self): |
216 | 219 |
status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), |
... | ... | |
804 | 807 |
self.log_prefix = log_prefix |
805 | 808 |
self.summary = op.input.Summary() |
806 | 809 |
|
810 |
# Create local copy to modify |
|
811 |
if getattr(op.input, opcodes.DEPEND_ATTR, None): |
|
812 |
self.jobdeps = op.input.depends[:] |
|
813 |
else: |
|
814 |
self.jobdeps = None |
|
815 |
|
|
807 | 816 |
self._timeout_strategy_factory = timeout_strategy_factory |
808 | 817 |
self._ResetTimeoutStrategy() |
809 | 818 |
|
... | ... | |
927 | 936 |
|
928 | 937 |
return update |
929 | 938 |
|
939 |
@staticmethod |
|
940 |
def _CheckDependencies(queue, job, opctx): |
|
941 |
"""Checks if an opcode has dependencies and if so, processes them. |
|
942 |
|
|
943 |
@type queue: L{JobQueue} |
|
944 |
@param queue: Queue object |
|
945 |
@type job: L{_QueuedJob} |
|
946 |
@param job: Job object |
|
947 |
@type opctx: L{_OpExecContext} |
|
948 |
@param opctx: Opcode execution context |
|
949 |
@rtype: bool |
|
950 |
@return: Whether opcode will be re-scheduled by dependency tracker |
|
951 |
|
|
952 |
""" |
|
953 |
op = opctx.op |
|
954 |
|
|
955 |
result = False |
|
956 |
|
|
957 |
while opctx.jobdeps: |
|
958 |
(dep_job_id, dep_status) = opctx.jobdeps[0] |
|
959 |
|
|
960 |
(depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, |
|
961 |
dep_status) |
|
962 |
assert ht.TNonEmptyString(depmsg), "No dependency message" |
|
963 |
|
|
964 |
logging.info("%s: %s", opctx.log_prefix, depmsg) |
|
965 |
|
|
966 |
if depresult == _JobDependencyManager.CONTINUE: |
|
967 |
# Remove dependency and continue |
|
968 |
opctx.jobdeps.pop(0) |
|
969 |
|
|
970 |
elif depresult == _JobDependencyManager.WAIT: |
|
971 |
# Need to wait for notification, dependency tracker will re-add job |
|
972 |
# to workerpool |
|
973 |
result = True |
|
974 |
break |
|
975 |
|
|
976 |
elif depresult == _JobDependencyManager.CANCEL: |
|
977 |
# Job was cancelled, cancel this job as well |
|
978 |
job.Cancel() |
|
979 |
assert op.status == constants.OP_STATUS_CANCELING |
|
980 |
break |
|
981 |
|
|
982 |
elif depresult in (_JobDependencyManager.WRONGSTATUS, |
|
983 |
_JobDependencyManager.ERROR): |
|
984 |
# Job failed or there was an error, this job must fail |
|
985 |
op.status = constants.OP_STATUS_ERROR |
|
986 |
op.result = _EncodeOpError(errors.OpExecError(depmsg)) |
|
987 |
break |
|
988 |
|
|
989 |
else: |
|
990 |
raise errors.ProgrammerError("Unknown dependency result '%s'" % |
|
991 |
depresult) |
|
992 |
|
|
993 |
return result |
|
994 |
|
|
930 | 995 |
def _ExecOpCodeUnlocked(self, opctx): |
931 | 996 |
"""Processes one opcode and returns the result. |
932 | 997 |
|
... | ... | |
1013 | 1078 |
assert (op.priority <= constants.OP_PRIO_LOWEST and |
1014 | 1079 |
op.priority >= constants.OP_PRIO_HIGHEST) |
1015 | 1080 |
|
1081 |
waitjob = None |
|
1082 |
|
|
1016 | 1083 |
if op.status != constants.OP_STATUS_CANCELING: |
1017 | 1084 |
assert op.status in (constants.OP_STATUS_QUEUED, |
1018 | 1085 |
constants.OP_STATUS_WAITLOCK) |
... | ... | |
1025 | 1092 |
assert op.status == constants.OP_STATUS_WAITLOCK |
1026 | 1093 |
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK |
1027 | 1094 |
assert job.start_timestamp and op.start_timestamp |
1095 |
assert waitjob is None |
|
1096 |
|
|
1097 |
# Check if waiting for a job is necessary |
|
1098 |
waitjob = self._CheckDependencies(queue, job, opctx) |
|
1028 | 1099 |
|
1029 |
logging.info("%s: opcode %s waiting for locks", |
|
1030 |
opctx.log_prefix, opctx.summary) |
|
1100 |
assert op.status in (constants.OP_STATUS_WAITLOCK, |
|
1101 |
constants.OP_STATUS_CANCELING, |
|
1102 |
constants.OP_STATUS_ERROR) |
|
1031 | 1103 |
|
1032 |
queue.release() |
|
1033 |
try: |
|
1034 |
(op_status, op_result) = self._ExecOpCodeUnlocked(opctx) |
|
1035 |
finally: |
|
1036 |
queue.acquire(shared=1) |
|
1104 |
if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, |
|
1105 |
constants.OP_STATUS_ERROR)): |
|
1106 |
logging.info("%s: opcode %s waiting for locks", |
|
1107 |
opctx.log_prefix, opctx.summary) |
|
1037 | 1108 |
|
1038 |
op.status = op_status |
|
1039 |
op.result = op_result |
|
1109 |
assert not opctx.jobdeps, "Not all dependencies were removed" |
|
1110 |
|
|
1111 |
queue.release() |
|
1112 |
try: |
|
1113 |
(op_status, op_result) = self._ExecOpCodeUnlocked(opctx) |
|
1114 |
finally: |
|
1115 |
queue.acquire(shared=1) |
|
1116 |
|
|
1117 |
op.status = op_status |
|
1118 |
op.result = op_result |
|
1119 |
|
|
1120 |
assert not waitjob |
|
1040 | 1121 |
|
1041 | 1122 |
if op.status == constants.OP_STATUS_WAITLOCK: |
1042 | 1123 |
# Couldn't get locks in time |
... | ... | |
1051 | 1132 |
else: |
1052 | 1133 |
assert op.status in constants.OPS_FINALIZED |
1053 | 1134 |
|
1054 |
if op.status == constants.OP_STATUS_WAITLOCK: |
|
1135 |
if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
|
|
1055 | 1136 |
finalize = False |
1056 | 1137 |
|
1057 |
if opctx.CheckPriorityIncrease(): |
|
1138 |
if not waitjob and opctx.CheckPriorityIncrease():
|
|
1058 | 1139 |
# Priority was changed, need to update on-disk file |
1059 | 1140 |
queue.UpdateJobUnlocked(job) |
1060 | 1141 |
|
... | ... | |
1113 | 1194 |
# allowed. Once the file has been written, it can be archived anytime. |
1114 | 1195 |
queue.UpdateJobUnlocked(job) |
1115 | 1196 |
|
1197 |
assert not waitjob |
|
1198 |
|
|
1116 | 1199 |
if finalize: |
1117 | 1200 |
logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) |
1201 |
# TODO: Check locking |
|
1202 |
queue.depmgr.NotifyWaiters(job.id) |
|
1118 | 1203 |
return True |
1119 | 1204 |
|
1120 |
return False |
|
1205 |
assert not waitjob or queue.depmgr.JobWaiting(job) |
|
1206 |
|
|
1207 |
return bool(waitjob) |
|
1121 | 1208 |
finally: |
1122 | 1209 |
queue.release() |
1123 | 1210 |
|
... | ... | |
1129 | 1216 |
def RunTask(self, job): # pylint: disable-msg=W0221 |
1130 | 1217 |
"""Job executor. |
1131 | 1218 |
|
1132 |
This functions processes a job. It is closely tied to the L{_QueuedJob} and |
|
1133 |
L{_QueuedOpCode} classes. |
|
1134 |
|
|
1135 | 1219 |
@type job: L{_QueuedJob} |
1136 | 1220 |
@param job: the job to be processed |
1137 | 1221 |
|
1138 | 1222 |
""" |
1223 |
# Ensure only one worker is active on a single job. If a job registers for |
|
1224 |
# a dependency job, and the other job notifies before the first worker is |
|
1225 |
# done, the job can end up in the tasklist more than once. |
|
1226 |
job.processor_lock.acquire() |
|
1227 |
try: |
|
1228 |
return self._RunTaskInner(job) |
|
1229 |
finally: |
|
1230 |
job.processor_lock.release() |
|
1231 |
|
|
1232 |
def _RunTaskInner(self, job): |
|
1233 |
"""Executes a job. |
|
1234 |
|
|
1235 |
Must be called with per-job lock acquired. |
|
1236 |
|
|
1237 |
""" |
|
1139 | 1238 |
queue = job.queue |
1140 | 1239 |
assert queue == self.pool.queue |
1141 | 1240 |
|
... | ... | |
1194 | 1293 |
self.queue = queue |
1195 | 1294 |
|
1196 | 1295 |
|
1296 |
class _JobDependencyManager: |
|
1297 |
"""Keeps track of job dependencies. |
|
1298 |
|
|
1299 |
""" |
|
1300 |
(WAIT, |
|
1301 |
ERROR, |
|
1302 |
CANCEL, |
|
1303 |
CONTINUE, |
|
1304 |
WRONGSTATUS) = range(1, 6) |
|
1305 |
|
|
1306 |
# TODO: Export waiter information to lock monitor |
|
1307 |
|
|
1308 |
def __init__(self, getstatus_fn, enqueue_fn): |
|
1309 |
"""Initializes this class. |
|
1310 |
|
|
1311 |
""" |
|
1312 |
self._getstatus_fn = getstatus_fn |
|
1313 |
self._enqueue_fn = enqueue_fn |
|
1314 |
|
|
1315 |
self._waiters = {} |
|
1316 |
self._lock = locking.SharedLock("JobDepMgr") |
|
1317 |
|
|
1318 |
@locking.ssynchronized(_LOCK, shared=1) |
|
1319 |
def JobWaiting(self, job): |
|
1320 |
"""Checks if a job is waiting. |
|
1321 |
|
|
1322 |
""" |
|
1323 |
return compat.any(job in jobs |
|
1324 |
for jobs in self._waiters.values()) |
|
1325 |
|
|
1326 |
@locking.ssynchronized(_LOCK) |
|
1327 |
def CheckAndRegister(self, job, dep_job_id, dep_status): |
|
1328 |
"""Checks if a dependency job has the requested status. |
|
1329 |
|
|
1330 |
If the other job is not yet in a finalized status, the calling job will be |
|
1331 |
notified (re-added to the workerpool) at a later point. |
|
1332 |
|
|
1333 |
@type job: L{_QueuedJob} |
|
1334 |
@param job: Job object |
|
1335 |
@type dep_job_id: string |
|
1336 |
@param dep_job_id: ID of dependency job |
|
1337 |
@type dep_status: list |
|
1338 |
@param dep_status: Required status |
|
1339 |
|
|
1340 |
""" |
|
1341 |
assert ht.TString(job.id) |
|
1342 |
assert ht.TString(dep_job_id) |
|
1343 |
assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) |
|
1344 |
|
|
1345 |
if job.id == dep_job_id: |
|
1346 |
return (self.ERROR, "Job can't depend on itself") |
|
1347 |
|
|
1348 |
# Get status of dependency job |
|
1349 |
try: |
|
1350 |
status = self._getstatus_fn(dep_job_id) |
|
1351 |
except errors.JobLost, err: |
|
1352 |
return (self.ERROR, "Dependency error: %s" % err) |
|
1353 |
|
|
1354 |
assert status in constants.JOB_STATUS_ALL |
|
1355 |
|
|
1356 |
job_id_waiters = self._waiters.setdefault(dep_job_id, set()) |
|
1357 |
|
|
1358 |
if status not in constants.JOBS_FINALIZED: |
|
1359 |
# Register for notification and wait for job to finish |
|
1360 |
job_id_waiters.add(job) |
|
1361 |
return (self.WAIT, |
|
1362 |
"Need to wait for job %s, wanted status '%s'" % |
|
1363 |
(dep_job_id, dep_status)) |
|
1364 |
|
|
1365 |
# Remove from waiters list |
|
1366 |
if job in job_id_waiters: |
|
1367 |
job_id_waiters.remove(job) |
|
1368 |
|
|
1369 |
if (status == constants.JOB_STATUS_CANCELED and |
|
1370 |
constants.JOB_STATUS_CANCELED not in dep_status): |
|
1371 |
return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) |
|
1372 |
|
|
1373 |
elif not dep_status or status in dep_status: |
|
1374 |
return (self.CONTINUE, |
|
1375 |
"Dependency job %s finished with status '%s'" % |
|
1376 |
(dep_job_id, status)) |
|
1377 |
|
|
1378 |
else: |
|
1379 |
return (self.WRONGSTATUS, |
|
1380 |
"Dependency job %s finished with status '%s'," |
|
1381 |
" not one of '%s' as required" % |
|
1382 |
(dep_job_id, status, utils.CommaJoin(dep_status))) |
|
1383 |
|
|
1384 |
@locking.ssynchronized(_LOCK) |
|
1385 |
def NotifyWaiters(self, job_id): |
|
1386 |
"""Notifies all jobs waiting for a certain job ID. |
|
1387 |
|
|
1388 |
@type job_id: string |
|
1389 |
@param job_id: Job ID |
|
1390 |
|
|
1391 |
""" |
|
1392 |
assert ht.TString(job_id) |
|
1393 |
|
|
1394 |
jobs = self._waiters.pop(job_id, None) |
|
1395 |
if jobs: |
|
1396 |
# Re-add jobs to workerpool |
|
1397 |
logging.debug("Re-adding %s jobs which were waiting for job %s", |
|
1398 |
len(jobs), job_id) |
|
1399 |
self._enqueue_fn(jobs) |
|
1400 |
|
|
1401 |
# Remove all jobs without actual waiters |
|
1402 |
for job_id in [job_id for (job_id, waiters) in self._waiters.items() |
|
1403 |
if not waiters]: |
|
1404 |
del self._waiters[job_id] |
|
1405 |
|
|
1406 |
|
|
1197 | 1407 |
def _RequireOpenQueue(fn): |
1198 | 1408 |
"""Decorator for "public" functions. |
1199 | 1409 |
|
... | ... | |
1277 | 1487 |
self._UpdateQueueSizeUnlocked() |
1278 | 1488 |
self._drained = jstore.CheckDrainFlag() |
1279 | 1489 |
|
1490 |
# Job dependencies |
|
1491 |
self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, |
|
1492 |
self._EnqueueJobs) |
|
1493 |
|
|
1280 | 1494 |
# Setup worker pool |
1281 | 1495 |
self._wpool = _JobQueueWorkerPool(self) |
1282 | 1496 |
try: |
... | ... | |
1808 | 2022 |
self._wpool.AddManyTasks([(job, ) for job in jobs], |
1809 | 2023 |
priority=[job.CalcPriority() for job in jobs]) |
1810 | 2024 |
|
2025 |
def _GetJobStatusForDependencies(self, job_id): |
|
2026 |
"""Gets the status of a job for dependencies. |
|
2027 |
|
|
2028 |
@type job_id: string |
|
2029 |
@param job_id: Job ID |
|
2030 |
@raise errors.JobLost: If job can't be found |
|
2031 |
|
|
2032 |
""" |
|
2033 |
if not isinstance(job_id, basestring): |
|
2034 |
job_id = self._FormatJobID(job_id) |
|
2035 |
|
|
2036 |
# Not using in-memory cache as doing so would require an exclusive lock |
|
2037 |
|
|
2038 |
# Try to load from disk |
|
2039 |
job = self.SafeLoadJobFromDisk(job_id, True) |
|
2040 |
|
|
2041 |
if job: |
|
2042 |
return job.CalcStatus() |
|
2043 |
|
|
2044 |
raise errors.JobLost("Job %s not found" % job_id) |
|
2045 |
|
|
1811 | 2046 |
@_RequireOpenQueue |
1812 | 2047 |
def UpdateJobUnlocked(self, job, replicate=True): |
1813 | 2048 |
"""Update a job's on disk storage. |
Also available in: Unified diff