Revision ac0930b9
b/lib/jqueue.py | ||
---|---|---|
227 | 227 |
|
228 | 228 |
def __init__(self): |
229 | 229 |
self._lock = threading.Lock() |
230 |
self._memcache = {} |
|
230 | 231 |
|
231 | 232 |
# Make sure our directory exists |
232 | 233 |
try: |
... | ... | |
319 | 320 |
If the parameter archived is True, archived jobs IDs will be |
320 | 321 |
included. Currently this argument is unused. |
321 | 322 |
|
323 |
The method only looks at disk because it's a requirement that all |
|
324 |
jobs are present on disk (so in the _memcache we don't have any |
|
325 |
extra IDs). |
|
326 |
|
|
322 | 327 |
""" |
323 | 328 |
jfiles = self._ListJobFiles() |
324 | 329 |
return [int(m.group(1)) for m in |
... | ... | |
333 | 338 |
def _LoadJobUnlocked(self, job_id): |
334 | 339 |
assert self.lock_fd, "Queue should be open" |
335 | 340 |
|
341 |
if job_id in self._memcache: |
|
342 |
logging.debug("Found job %d in memcache", job_id) |
|
343 |
return self._memcache[job_id] |
|
344 |
|
|
336 | 345 |
filepath = self._GetJobPath(job_id) |
337 | 346 |
logging.debug("Loading job from %s", filepath) |
338 | 347 |
try: |
... | ... | |
346 | 355 |
finally: |
347 | 356 |
fd.close() |
348 | 357 |
|
349 |
return _QueuedJob.Restore(self, data) |
|
358 |
job = _QueuedJob.Restore(self, data) |
|
359 |
self._memcache[job_id] = job |
|
360 |
logging.debug("Added job %d to the cache", job_id) |
|
361 |
return job |
|
350 | 362 |
|
351 | 363 |
def _GetJobsUnlocked(self, job_ids): |
352 | 364 |
if not job_ids: |
... | ... | |
369 | 381 |
# Write to disk |
370 | 382 |
self._UpdateJobUnlocked(job) |
371 | 383 |
|
384 |
logging.debug("Added new job %d to the cache", job_id) |
|
385 |
self._memcache[job_id] = job |
|
386 |
|
|
372 | 387 |
return job |
373 | 388 |
|
374 | 389 |
def _UpdateJobUnlocked(self, job): |
... | ... | |
378 | 393 |
logging.debug("Writing job %s to %s", job.id, filename) |
379 | 394 |
utils.WriteFile(filename, |
380 | 395 |
data=serializer.DumpJson(job.Serialize(), indent=False)) |
396 |
self._CleanCacheUnlocked(exceptions=[job.id]) |
|
397 |
|
|
398 |
def _CleanCacheUnlocked(self, exceptions=None): |
|
399 |
"""Clean the memory cache. |
|
400 |
|
|
401 |
The exceptions argument contains job IDs that should not be |
|
402 |
cleaned. |
|
403 |
|
|
404 |
""" |
|
405 |
assert isinstance(exceptions, list) |
|
406 |
for job in self._memcache.values(): |
|
407 |
if job.id in exceptions: |
|
408 |
continue |
|
409 |
if job.GetStatus() not in (constants.JOB_STATUS_QUEUED, |
|
410 |
constants.JOB_STATUS_RUNNING): |
|
411 |
logging.debug("Cleaning job %d from the cache", job.id) |
|
412 |
try: |
|
413 |
del self._memcache[job.id] |
|
414 |
except KeyError: |
|
415 |
pass |
|
381 | 416 |
|
382 | 417 |
@utils.LockedMethod |
383 | 418 |
def UpdateJob(self, job): |
Also available in: Unified diff