Properly handle user actor eviction
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 2 Jul 2012 11:03:36 +0000 (14:03 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 2 Jul 2012 11:03:36 +0000 (14:03 +0300)
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/service/AkkaService.scala

index 1f819bb..ec22e55 100644 (file)
@@ -71,6 +71,7 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   override def postStop() {
+    DEBUG("I am finally stopped (in postStop())")
     aquarium.akkaService.notifyUserActorPostStop(this)
   }
 
index 8a161fb..f816ca9 100644 (file)
@@ -41,7 +41,7 @@ import gr.grnet.aquarium.ResourceLocator.SysEnvs
 import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, AquariumException, AquariumInternalError}
 import com.typesafe.config.ConfigFactory
 import java.util.concurrent.atomic.AtomicBoolean
-import com.google.common.cache.{RemovalNotification, RemovalListener, CacheBuilder, Cache}
+import com.google.common.cache.{CacheStats, RemovalNotification, RemovalListener, CacheBuilder, Cache}
 import com.ckkloverdos.props.{Props ⇒ KKProps}
 import gr.grnet.aquarium.actor.service.user.UserActor
 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
@@ -61,9 +61,9 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif
   @volatile private[this] var _actorSystem: ActorSystem = _
   @volatile private[this] var _userActorCache: Cache[String, ActorRef] = _
   @volatile private[this] var _cacheEvictionListener: RemovalListener[String, ActorRef] = _
-  @volatile private[this] var _cacheMaximumSize = 1000
-  @volatile private[this] var _cacheInitialCapacity = 100
-  @volatile private[this] var _cacheConcurrencyLevel = 20
+  @volatile private[this] var _cacheMaximumSize: Int = _
+  @volatile private[this] var _cacheInitialCapacity: Int = _
+  @volatile private[this] var _cacheConcurrencyLevel: Int = _
 
   private[this] val stoppingUserActors = new ConcurrentHashMap[String, Future[Boolean]]
 
@@ -76,7 +76,11 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif
    *
    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
    */
-  def configure(props: KKProps): Unit = {}
+  def configure(props: KKProps): Unit = {
+    this._cacheMaximumSize = 1000
+    this._cacheInitialCapacity = 2 * this._cacheMaximumSize / 3
+    this._cacheConcurrencyLevel = Runtime.getRuntime.availableProcessors() * 2
+  }
 
   def actorSystem = {
     if(this._actorSystem eq null) {
@@ -90,6 +94,19 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif
     this._actorSystem
   }
 
+  def foreachCachedUserID[A](f: String ⇒ A): Unit = {
+    val keys = this._userActorCache.asMap().keySet().iterator()
+    while(keys.hasNext) { f(keys.next()) }
+  }
+
+  def cacheStats: CacheStats = {
+    this._userActorCache.stats()
+  }
+
+  def cacheSize: Long = {
+    this._userActorCache.size()
+  }
+
   def start() = {
     // We have AKKA builtin, so no need to mess with pre-existing installation.
     if(SysEnvs.AKKA_HOME.value.isJust) {
@@ -106,23 +123,30 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif
 
         val userID   = rn.getKey
         val actorRef = rn.getValue
+        val cause    = rn.getCause
 
-        logger.debug("Due to memory constraints, unloading UserActor for userID = %s".format(userID))
-
-        gracefullyStopUserActor(userID, actorRef)
+        if(rn.wasEvicted()) {
+          logger.debug("Evicted UserActor %s due to %s".format(userID, cause))
+          gracefullyStopUserActor(userID, actorRef)
+        } else {
+          logger.debug("UserActor %s cache notification for %s".format(userID, cause))
+        }
       }
     }
 
     this._userActorCache = CacheBuilder.
       newBuilder().
-        maximumSize(_cacheMaximumSize).
-        initialCapacity(_cacheInitialCapacity).
-        concurrencyLevel(_cacheConcurrencyLevel).
+        recordStats().
+        maximumSize(this._cacheMaximumSize).
+        initialCapacity(this._cacheInitialCapacity).
+        concurrencyLevel(this._cacheConcurrencyLevel).
         removalListener(this._cacheEvictionListener).
       build()
 
+    this._userActorCache.stats()
+
     this._actorSystem = ActorSystem("aquarium-akka", ConfigFactory.load("akka.conf"))
-    logger.debug("Created %s".format(this._actorSystem))
+    logger.debug("Created %s %s".format(shortClassNameOf(this._actorSystem), this._actorSystem))
   }
 
   def stop() = {
@@ -139,10 +163,12 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif
   }
 
   def notifyUserActorPostStop(userActor: UserActor): Unit = {
+    logger.debug("Removing UserActor %s from stopping set (after postStop())".format(userActor.userID))
     this.stoppingUserActors.remove(userActor.userID)
   }
 
   private[this] def gracefullyStopUserActor(userID: String, actorRef: ActorRef): Unit = {
+    logger.debug("Gracefully stopping UserActor %s (and inserting into stopping set)".format(userID))
     this.stoppingUserActors.put(
       userID,
       akka.pattern.gracefulStop(actorRef, Duration(1000, TimeUnit.MILLISECONDS))(this._actorSystem)
@@ -172,24 +198,29 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif
     // If stopping, wait to stop or ignore
     this.stoppingUserActors.get(userID) match {
       case null ⇒
+        logger.debug("UserActor %s was not stopping (don't know if it exists yet)".format(userID))
 
       case future ⇒
         try {
-          Await.result(future, Duration(500, TimeUnit.MILLISECONDS))
+          logger.debug("Waiting while UserActor %s is stopping".format(userID))
+          Await.result(future, Duration(1000, TimeUnit.MILLISECONDS))
         }
         catch {
           case e: Throwable ⇒
             logger.error("While Await(ing) UserActor %s to stop".format(userID), e)
         }
-        this.stoppingUserActors.remove(userID)
+        finally {
+          this.stoppingUserActors.remove(userID)
+        }
     }
 
     this._userActorCache.get(userID, new Callable[ActorRef] {
       def call(): ActorRef = {
         // Create new User Actor instance
+        logger.debug("Creating new UserActor instance for %s".format(userID))
         val actorRef = _actorSystem.actorOf(Props.apply({
           val actor = aquarium.newInstance(classOf[UserActor], classOf[UserActor].getName)
-          actor.awareOfAquariumEx(AquariumCreatedEvent(aquarium))
+          actor.awareOfAquarium(AquariumCreatedEvent(aquarium))
           actor
         }), "userActor::%s".format(userID))