628 |
628 |
|
629 |
629 |
# TODO: Check consistency across nodes
|
630 |
630 |
|
|
631 |
self._queue_size = 0
|
|
632 |
self._UpdateQueueSizeUnlocked()
|
|
633 |
self._drained = self._IsQueueMarkedDrain()
|
|
634 |
|
631 |
635 |
# Setup worker pool
|
632 |
636 |
self._wpool = _JobQueueWorkerPool(self)
|
633 |
637 |
try:
|
... | ... | |
997 |
1001 |
"""
|
998 |
1002 |
return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
|
999 |
1003 |
|
1000 |
|
@staticmethod
|
1001 |
|
def SetDrainFlag(drain_flag):
|
|
1004 |
def _UpdateQueueSizeUnlocked(self):
|
|
1005 |
"""Update the queue size.
|
|
1006 |
|
|
1007 |
"""
|
|
1008 |
self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
|
|
1009 |
|
|
1010 |
@utils.LockedMethod
|
|
1011 |
@_RequireOpenQueue
|
|
1012 |
def SetDrainFlag(self, drain_flag):
|
1002 |
1013 |
"""Sets the drain flag for the queue.
|
1003 |
1014 |
|
1004 |
1015 |
@type drain_flag: boolean
|
... | ... | |
1009 |
1020 |
utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
|
1010 |
1021 |
else:
|
1011 |
1022 |
utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
|
|
1023 |
|
|
1024 |
self._drained = drain_flag
|
|
1025 |
|
1012 |
1026 |
return True
|
1013 |
1027 |
|
1014 |
1028 |
@_RequireOpenQueue
|
... | ... | |
1027 |
1041 |
@raise errors.JobQueueDrainError: if the job is marked for draining
|
1028 |
1042 |
|
1029 |
1043 |
"""
|
1030 |
|
if self._IsQueueMarkedDrain():
|
|
1044 |
# Ok when sharing the big job queue lock, as the drain file is created when
|
|
1045 |
# the lock is exclusive.
|
|
1046 |
if self._drained:
|
1031 |
1047 |
raise errors.JobQueueDrainError("Job queue is drained, refusing job")
|
1032 |
1048 |
|
1033 |
|
# Check job queue size
|
1034 |
|
size = len(self._GetJobIDsUnlocked(sort=False))
|
1035 |
|
if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
|
1036 |
|
# TODO: Autoarchive jobs. Make sure it's not done on every job
|
1037 |
|
# submission, though.
|
1038 |
|
#size = ...
|
1039 |
|
pass
|
1040 |
|
|
1041 |
|
if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
|
|
1049 |
if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
|
1042 |
1050 |
raise errors.JobQueueFull()
|
1043 |
1051 |
|
1044 |
1052 |
job = _QueuedJob(self, job_id, ops)
|
... | ... | |
1046 |
1054 |
# Write to disk
|
1047 |
1055 |
self.UpdateJobUnlocked(job)
|
1048 |
1056 |
|
|
1057 |
self._queue_size += 1
|
|
1058 |
|
1049 |
1059 |
logging.debug("Adding new job %s to the cache", job_id)
|
1050 |
1060 |
self._memcache[job_id] = job
|
1051 |
1061 |
|
... | ... | |
1250 |
1260 |
logging.debug("Successfully archived job(s) %s",
|
1251 |
1261 |
utils.CommaJoin(job.id for job in archive_jobs))
|
1252 |
1262 |
|
|
1263 |
# Since we haven't quite checked, above, if we succeeded or failed renaming
|
|
1264 |
# the files, we update the cached queue size from the filesystem. When we
|
|
1265 |
# get around to fix the TODO: above, we can use the number of actually
|
|
1266 |
# archived jobs to fix this.
|
|
1267 |
self._UpdateQueueSizeUnlocked()
|
1253 |
1268 |
return len(archive_jobs)
|
1254 |
1269 |
|
1255 |
1270 |
@utils.LockedMethod
|