From 923b16bfee82e7297479674b16c0051b548ce618 Mon Sep 17 00:00:00 2001 From: Christos KK Loverdos Date: Mon, 2 Jul 2012 14:03:36 +0300 Subject: [PATCH] Properly handle user actor eviction --- .../aquarium/actor/service/user/UserActor.scala | 1 + .../gr/grnet/aquarium/service/AkkaService.scala | 61 +++++++++++++++----- 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index 1f819bb..ec22e55 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -71,6 +71,7 @@ class UserActor extends ReflectiveRoleableActor { } override def postStop() { + DEBUG("I am finally stopped (in postStop())") aquarium.akkaService.notifyUserActorPostStop(this) } diff --git a/src/main/scala/gr/grnet/aquarium/service/AkkaService.scala b/src/main/scala/gr/grnet/aquarium/service/AkkaService.scala index 8a161fb..f816ca9 100644 --- a/src/main/scala/gr/grnet/aquarium/service/AkkaService.scala +++ b/src/main/scala/gr/grnet/aquarium/service/AkkaService.scala @@ -41,7 +41,7 @@ import gr.grnet.aquarium.ResourceLocator.SysEnvs import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, AquariumException, AquariumInternalError} import com.typesafe.config.ConfigFactory import java.util.concurrent.atomic.AtomicBoolean -import com.google.common.cache.{RemovalNotification, RemovalListener, CacheBuilder, Cache} +import com.google.common.cache.{CacheStats, RemovalNotification, RemovalListener, CacheBuilder, Cache} import com.ckkloverdos.props.{Props ⇒ KKProps} import gr.grnet.aquarium.actor.service.user.UserActor import gr.grnet.aquarium.service.event.AquariumCreatedEvent @@ -61,9 +61,9 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif @volatile private[this] var _actorSystem: ActorSystem = _ @volatile private[this] var _userActorCache: Cache[String, ActorRef] = _ @volatile private[this] var _cacheEvictionListener: RemovalListener[String, ActorRef] = _ - @volatile private[this] var _cacheMaximumSize = 1000 - @volatile private[this] var _cacheInitialCapacity = 100 - @volatile private[this] var _cacheConcurrencyLevel = 20 + @volatile private[this] var _cacheMaximumSize: Int = _ + @volatile private[this] var _cacheInitialCapacity: Int = _ + @volatile private[this] var _cacheConcurrencyLevel: Int = _ private[this] val stoppingUserActors = new ConcurrentHashMap[String, Future[Boolean]] @@ -76,7 +76,11 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif * * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix. */ - def configure(props: KKProps): Unit = {} + def configure(props: KKProps): Unit = { + this._cacheMaximumSize = 1000 + this._cacheInitialCapacity = 2 * this._cacheMaximumSize / 3 + this._cacheConcurrencyLevel = Runtime.getRuntime.availableProcessors() * 2 + } def actorSystem = { if(this._actorSystem eq null) { @@ -90,6 +94,19 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif this._actorSystem } + def foreachCachedUserID[A](f: String ⇒ A): Unit = { + val keys = this._userActorCache.asMap().keySet().iterator() + while(keys.hasNext) { f(keys.next()) } + } + + def cacheStats: CacheStats = { + this._userActorCache.stats() + } + + def cacheSize: Long = { + this._userActorCache.size() + } + def start() = { // We have AKKA builtin, so no need to mess with pre-existing installation. if(SysEnvs.AKKA_HOME.value.isJust) { @@ -106,23 +123,30 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif val userID = rn.getKey val actorRef = rn.getValue + val cause = rn.getCause - logger.debug("Due to memory constraints, unloading UserActor for userID = %s".format(userID)) - - gracefullyStopUserActor(userID, actorRef) + if(rn.wasEvicted()) { + logger.debug("Evicted UserActor %s due to %s".format(userID, cause)) + gracefullyStopUserActor(userID, actorRef) + } else { + logger.debug("UserActor %s cache notification for %s".format(userID, cause)) + } } } this._userActorCache = CacheBuilder. newBuilder(). - maximumSize(_cacheMaximumSize). - initialCapacity(_cacheInitialCapacity). - concurrencyLevel(_cacheConcurrencyLevel). + recordStats(). + maximumSize(this._cacheMaximumSize). + initialCapacity(this._cacheInitialCapacity). + concurrencyLevel(this._cacheConcurrencyLevel). removalListener(this._cacheEvictionListener). build() + this._userActorCache.stats() + this._actorSystem = ActorSystem("aquarium-akka", ConfigFactory.load("akka.conf")) - logger.debug("Created %s".format(this._actorSystem)) + logger.debug("Created %s %s".format(shortClassNameOf(this._actorSystem), this._actorSystem)) } def stop() = { @@ -139,10 +163,12 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif } def notifyUserActorPostStop(userActor: UserActor): Unit = { + logger.debug("Removing UserActor %s from stopping set (after postStop())".format(userActor.userID)) this.stoppingUserActors.remove(userActor.userID) } private[this] def gracefullyStopUserActor(userID: String, actorRef: ActorRef): Unit = { + logger.debug("Gracefully stopping UserActor %s (and inserting into stopping set)".format(userID)) this.stoppingUserActors.put( userID, akka.pattern.gracefulStop(actorRef, Duration(1000, TimeUnit.MILLISECONDS))(this._actorSystem) @@ -172,24 +198,29 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif // If stopping, wait to stop or ignore this.stoppingUserActors.get(userID) match { case null ⇒ + logger.debug("UserActor %s was not stopping (don't know if it exists yet)".format(userID)) case future ⇒ try { - Await.result(future, Duration(500, TimeUnit.MILLISECONDS)) + logger.debug("Waiting while UserActor %s is stopping".format(userID)) + Await.result(future, Duration(1000, TimeUnit.MILLISECONDS)) } catch { case e: Throwable ⇒ logger.error("While Await(ing) UserActor %s to stop".format(userID), e) } - this.stoppingUserActors.remove(userID) + finally { + this.stoppingUserActors.remove(userID) + } } this._userActorCache.get(userID, new Callable[ActorRef] { def call(): ActorRef = { // Create new User Actor instance + logger.debug("Creating new UserActor instance for %s".format(userID)) val actorRef = _actorSystem.actorOf(Props.apply({ val actor = aquarium.newInstance(classOf[UserActor], classOf[UserActor].getName) - actor.awareOfAquariumEx(AquariumCreatedEvent(aquarium)) + actor.awareOfAquarium(AquariumCreatedEvent(aquarium)) actor }), "userActor::%s".format(userID)) -- 1.7.10.4