Revision ac0930b9 lib/jqueue.py

b/lib/jqueue.py
227 227

  
228 228
  def __init__(self):
229 229
    self._lock = threading.Lock()
230
    self._memcache = {}
230 231

  
231 232
    # Make sure our directory exists
232 233
    try:
......
319 320
    If the parameter archived is True, archived jobs IDs will be
320 321
    included. Currently this argument is unused.
321 322

  
323
    The method only looks at disk because it's a requirement that all
324
    jobs are present on disk (so in the _memcache we don't have any
325
    extra IDs).
326

  
322 327
    """
323 328
    jfiles = self._ListJobFiles()
324 329
    return [int(m.group(1)) for m in
......
333 338
  def _LoadJobUnlocked(self, job_id):
334 339
    assert self.lock_fd, "Queue should be open"
335 340

  
341
    if job_id in self._memcache:
342
      logging.debug("Found job %d in memcache", job_id)
343
      return self._memcache[job_id]
344

  
336 345
    filepath = self._GetJobPath(job_id)
337 346
    logging.debug("Loading job from %s", filepath)
338 347
    try:
......
346 355
    finally:
347 356
      fd.close()
348 357

  
349
    return _QueuedJob.Restore(self, data)
358
    job = _QueuedJob.Restore(self, data)
359
    self._memcache[job_id] = job
360
    logging.debug("Added job %d to the cache", job_id)
361
    return job
350 362

  
351 363
  def _GetJobsUnlocked(self, job_ids):
352 364
    if not job_ids:
......
369 381
    # Write to disk
370 382
    self._UpdateJobUnlocked(job)
371 383

  
384
    logging.debug("Added new job %d to the cache", job_id)
385
    self._memcache[job_id] = job
386

  
372 387
    return job
373 388

  
374 389
  def _UpdateJobUnlocked(self, job):
......
378 393
    logging.debug("Writing job %s to %s", job.id, filename)
379 394
    utils.WriteFile(filename,
380 395
                    data=serializer.DumpJson(job.Serialize(), indent=False))
396
    self._CleanCacheUnlocked(exceptions=[job.id])
397

  
398
  def _CleanCacheUnlocked(self, exceptions=None):
399
    """Clean the memory cache.
400

  
401
    The exceptions argument contains job IDs that should not be
402
    cleaned.
403

  
404
    """
405
    assert isinstance(exceptions, list)
406
    for job in self._memcache.values():
407
      if job.id in exceptions:
408
        continue
409
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
410
                                 constants.JOB_STATUS_RUNNING):
411
        logging.debug("Cleaning job %d from the cache", job.id)
412
        try:
413
          del self._memcache[job.id]
414
        except KeyError:
415
          pass
381 416

  
382 417
  @utils.LockedMethod
383 418
  def UpdateJob(self, job):

Also available in: Unified diff