Revision 20571a26 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
628 | 628 |
|
629 | 629 |
# TODO: Check consistency across nodes |
630 | 630 |
|
631 |
self._queue_size = 0 |
|
632 |
self._UpdateQueueSizeUnlocked() |
|
633 |
self._drained = self._IsQueueMarkedDrain() |
|
634 |
|
|
631 | 635 |
# Setup worker pool |
632 | 636 |
self._wpool = _JobQueueWorkerPool(self) |
633 | 637 |
try: |
... | ... | |
997 | 1001 |
""" |
998 | 1002 |
return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) |
999 | 1003 |
|
1000 |
@staticmethod |
|
1001 |
def SetDrainFlag(drain_flag): |
|
1004 |
def _UpdateQueueSizeUnlocked(self): |
|
1005 |
"""Update the queue size. |
|
1006 |
|
|
1007 |
""" |
|
1008 |
self._queue_size = len(self._GetJobIDsUnlocked(sort=False)) |
|
1009 |
|
|
1010 |
@utils.LockedMethod |
|
1011 |
@_RequireOpenQueue |
|
1012 |
def SetDrainFlag(self, drain_flag): |
|
1002 | 1013 |
"""Sets the drain flag for the queue. |
1003 | 1014 |
|
1004 | 1015 |
@type drain_flag: boolean |
... | ... | |
1009 | 1020 |
utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True) |
1010 | 1021 |
else: |
1011 | 1022 |
utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) |
1023 |
|
|
1024 |
self._drained = drain_flag |
|
1025 |
|
|
1012 | 1026 |
return True |
1013 | 1027 |
|
1014 | 1028 |
@_RequireOpenQueue |
... | ... | |
1027 | 1041 |
@raise errors.JobQueueDrainError: if the job is marked for draining |
1028 | 1042 |
|
1029 | 1043 |
""" |
1030 |
if self._IsQueueMarkedDrain(): |
|
1044 |
# Ok when sharing the big job queue lock, as the drain file is created when |
|
1045 |
# the lock is exclusive. |
|
1046 |
if self._drained: |
|
1031 | 1047 |
raise errors.JobQueueDrainError("Job queue is drained, refusing job") |
1032 | 1048 |
|
1033 |
# Check job queue size |
|
1034 |
size = len(self._GetJobIDsUnlocked(sort=False)) |
|
1035 |
if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT: |
|
1036 |
# TODO: Autoarchive jobs. Make sure it's not done on every job |
|
1037 |
# submission, though. |
|
1038 |
#size = ... |
|
1039 |
pass |
|
1040 |
|
|
1041 |
if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: |
|
1049 |
if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: |
|
1042 | 1050 |
raise errors.JobQueueFull() |
1043 | 1051 |
|
1044 | 1052 |
job = _QueuedJob(self, job_id, ops) |
... | ... | |
1046 | 1054 |
# Write to disk |
1047 | 1055 |
self.UpdateJobUnlocked(job) |
1048 | 1056 |
|
1057 |
self._queue_size += 1 |
|
1058 |
|
|
1049 | 1059 |
logging.debug("Adding new job %s to the cache", job_id) |
1050 | 1060 |
self._memcache[job_id] = job |
1051 | 1061 |
|
... | ... | |
1250 | 1260 |
logging.debug("Successfully archived job(s) %s", |
1251 | 1261 |
utils.CommaJoin(job.id for job in archive_jobs)) |
1252 | 1262 |
|
1263 |
# Since we haven't quite checked, above, if we succeeded or failed renaming |
|
1264 |
# the files, we update the cached queue size from the filesystem. When we |
|
1265 |
# get around to fix the TODO: above, we can use the number of actually |
|
1266 |
# archived jobs to fix this. |
|
1267 |
self._UpdateQueueSizeUnlocked() |
|
1253 | 1268 |
return len(archive_jobs) |
1254 | 1269 |
|
1255 | 1270 |
@utils.LockedMethod |
Also available in: Unified diff