import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, AquariumException, AquariumInternalError}
import com.typesafe.config.ConfigFactory
import java.util.concurrent.atomic.AtomicBoolean
-import com.google.common.cache.{RemovalNotification, RemovalListener, CacheBuilder, Cache}
+import com.google.common.cache.{CacheStats, RemovalNotification, RemovalListener, CacheBuilder, Cache}
import com.ckkloverdos.props.{Props ⇒ KKProps}
import gr.grnet.aquarium.actor.service.user.UserActor
import gr.grnet.aquarium.service.event.AquariumCreatedEvent
@volatile private[this] var _actorSystem: ActorSystem = _
@volatile private[this] var _userActorCache: Cache[String, ActorRef] = _
@volatile private[this] var _cacheEvictionListener: RemovalListener[String, ActorRef] = _
- @volatile private[this] var _cacheMaximumSize = 1000
- @volatile private[this] var _cacheInitialCapacity = 100
- @volatile private[this] var _cacheConcurrencyLevel = 20
+ @volatile private[this] var _cacheMaximumSize: Int = _
+ @volatile private[this] var _cacheInitialCapacity: Int = _
+ @volatile private[this] var _cacheConcurrencyLevel: Int = _
private[this] val stoppingUserActors = new ConcurrentHashMap[String, Future[Boolean]]
*
* If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
*/
- def configure(props: KKProps): Unit = {}
+ def configure(props: KKProps): Unit = {
+ this._cacheMaximumSize = 1000
+ this._cacheInitialCapacity = 2 * this._cacheMaximumSize / 3
+ this._cacheConcurrencyLevel = Runtime.getRuntime.availableProcessors() * 2
+ }
def actorSystem = {
if(this._actorSystem eq null) {
this._actorSystem
}
+ def foreachCachedUserID[A](f: String ⇒ A): Unit = {
+ val keys = this._userActorCache.asMap().keySet().iterator()
+ while(keys.hasNext) { f(keys.next()) }
+ }
+
+ def cacheStats: CacheStats = {
+ this._userActorCache.stats()
+ }
+
+ def cacheSize: Long = {
+ this._userActorCache.size()
+ }
+
def start() = {
// We have AKKA builtin, so no need to mess with pre-existing installation.
if(SysEnvs.AKKA_HOME.value.isJust) {
val userID = rn.getKey
val actorRef = rn.getValue
+ val cause = rn.getCause
- logger.debug("Due to memory constraints, unloading UserActor for userID = %s".format(userID))
-
- gracefullyStopUserActor(userID, actorRef)
+ if(rn.wasEvicted()) {
+ logger.debug("Evicted UserActor %s due to %s".format(userID, cause))
+ gracefullyStopUserActor(userID, actorRef)
+ } else {
+ logger.debug("UserActor %s cache notification for %s".format(userID, cause))
+ }
}
}
this._userActorCache = CacheBuilder.
newBuilder().
- maximumSize(_cacheMaximumSize).
- initialCapacity(_cacheInitialCapacity).
- concurrencyLevel(_cacheConcurrencyLevel).
+ recordStats().
+ maximumSize(this._cacheMaximumSize).
+ initialCapacity(this._cacheInitialCapacity).
+ concurrencyLevel(this._cacheConcurrencyLevel).
removalListener(this._cacheEvictionListener).
build()
+ this._userActorCache.stats()
+
this._actorSystem = ActorSystem("aquarium-akka", ConfigFactory.load("akka.conf"))
- logger.debug("Created %s".format(this._actorSystem))
+ logger.debug("Created %s %s".format(shortClassNameOf(this._actorSystem), this._actorSystem))
}
def stop() = {
}
def notifyUserActorPostStop(userActor: UserActor): Unit = {
+ logger.debug("Removing UserActor %s from stopping set (after postStop())".format(userActor.userID))
this.stoppingUserActors.remove(userActor.userID)
}
private[this] def gracefullyStopUserActor(userID: String, actorRef: ActorRef): Unit = {
+ logger.debug("Gracefully stopping UserActor %s (and inserting into stopping set)".format(userID))
this.stoppingUserActors.put(
userID,
akka.pattern.gracefulStop(actorRef, Duration(1000, TimeUnit.MILLISECONDS))(this._actorSystem)
// If stopping, wait to stop or ignore
this.stoppingUserActors.get(userID) match {
case null ⇒
+ logger.debug("UserActor %s was not stopping (don't know if it exists yet)".format(userID))
case future ⇒
try {
- Await.result(future, Duration(500, TimeUnit.MILLISECONDS))
+ logger.debug("Waiting while UserActor %s is stopping".format(userID))
+ Await.result(future, Duration(1000, TimeUnit.MILLISECONDS))
}
catch {
case e: Throwable ⇒
logger.error("While Await(ing) UserActor %s to stop".format(userID), e)
}
- this.stoppingUserActors.remove(userID)
+ finally {
+ this.stoppingUserActors.remove(userID)
+ }
}
this._userActorCache.get(userID, new Callable[ActorRef] {
def call(): ActorRef = {
// Create new User Actor instance
+ logger.debug("Creating new UserActor instance for %s".format(userID))
val actorRef = _actorSystem.actorOf(Props.apply({
val actor = aquarium.newInstance(classOf[UserActor], classOf[UserActor].getName)
- actor.awareOfAquariumEx(AquariumCreatedEvent(aquarium))
+ actor.awareOfAquarium(AquariumCreatedEvent(aquarium))
actor
}), "userActor::%s".format(userID))