Revision 5685c1a5 lib/jqueue.py

b/lib/jqueue.py
1 1
#
2 2
#
3 3

  
4
# Copyright (C) 2006, 2007 Google Inc.
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 5
#
6 6
# This program is free software; you can redistribute it and/or modify
7 7
# it under the terms of the GNU General Public License as published by
......
33 33
import errno
34 34
import re
35 35
import time
36
import weakref
36 37

  
37 38
from ganeti import constants
38 39
from ganeti import serializer
......
312 313

  
313 314
  def __init__(self, context):
314 315
    self.context = context
315
    self._memcache = {}
316
    self._memcache = weakref.WeakValueDictionary()
316 317
    self._my_hostname = utils.HostInfo().name
317 318

  
318 319
    # Locking
......
489 490
            if self._RE_JOB_FILE.match(name)]
490 491

  
491 492
  def _LoadJobUnlocked(self, job_id):
492
    if job_id in self._memcache:
493
    job = self._memcache.get(job_id, None)
494
    if job:
493 495
      logging.debug("Found job %s in memcache", job_id)
494
      return self._memcache[job_id]
496
      return job
495 497

  
496 498
    filepath = self._GetJobPath(job_id)
497 499
    logging.debug("Loading job from %s", filepath)
......
536 538
    # Write to disk
537 539
    self.UpdateJobUnlocked(job)
538 540

  
539
    logging.debug("Added new job %s to the cache", job_id)
541
    logging.debug("Adding new job %s to the cache", job_id)
540 542
    self._memcache[job_id] = job
541 543

  
542 544
    # Add to worker pool
......
550 552
    data = serializer.DumpJson(job.Serialize(), indent=False)
551 553
    logging.debug("Writing job %s to %s", job.id, filename)
552 554
    self._WriteAndReplicateFileUnlocked(filename, data)
553
    self._CleanCacheUnlocked([job.id])
554 555

  
555 556
    # Notify waiters about potential changes
556 557
    job.change.notifyAll()
557 558

  
558
  def _CleanCacheUnlocked(self, exclude):
559
    """Clean the memory cache.
560

  
561
    The exceptions argument contains job IDs that should not be
562
    cleaned.
563

  
564
    """
565
    assert isinstance(exclude, list)
566

  
567
    for job in self._memcache.values():
568
      if job.id in exclude:
569
        continue
570
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
571
                                  constants.JOB_STATUS_RUNNING):
572
        logging.debug("Cleaning job %s from the cache", job.id)
573
        try:
574
          del self._memcache[job.id]
575
        except KeyError:
576
          pass
577

  
578 559
  @utils.LockedMethod
579 560
  @_RequireOpenQueue
580 561
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
......
679 660
      logging.debug("Job %s is not yet done", job.id)
680 661
      return
681 662

  
682
    try:
683
      old = self._GetJobPath(job.id)
684
      new = self._GetArchivedJobPath(job.id)
663
    old = self._GetJobPath(job.id)
664
    new = self._GetArchivedJobPath(job.id)
685 665

  
686
      self._RenameFileUnlocked(old, new)
666
    self._RenameFileUnlocked(old, new)
687 667

  
688
      logging.debug("Successfully archived job %s", job.id)
689
    finally:
690
      # Cleaning the cache because we don't know what os.rename actually did
691
      # and to be on the safe side.
692
      self._CleanCacheUnlocked([])
668
    logging.debug("Successfully archived job %s", job.id)
693 669

  
694 670
  def _GetJobInfoUnlocked(self, job, fields):
695 671
    row = []

Also available in: Unified diff