From cb3f0ce09921fda97c9fbae2de114eb6fbf48ba5 Mon Sep 17 00:00:00 2001 From: Georgios Gousios Date: Tue, 20 Mar 2012 13:40:54 +0200 Subject: [PATCH] User actor supervisor, actor cache based on Guava --- pom.xml | 6 ++ project/build/Aquarium.scala | 2 + .../gr/grnet/aquarium/user/actor/UserActor.scala | 9 +-- .../grnet/aquarium/user/actor/UserActorCache.scala | 81 ++++++++++++++++++++ .../aquarium/user/actor/UserActorManager.scala | 9 ++- .../aquarium/user/actor/UserActorMessage.scala | 2 - .../aquarium/user/actor/UserActorSupervisor.scala | 21 +++++ .../grnet/aquarium/user/actor/UserActorsLRU.scala | 12 ++- 8 files changed, 127 insertions(+), 15 deletions(-) create mode 100644 src/main/scala/gr/grnet/aquarium/user/actor/UserActorCache.scala create mode 100644 src/main/scala/gr/grnet/aquarium/user/actor/UserActorSupervisor.scala diff --git a/pom.xml b/pom.xml index 9efe752..bdae97f 100644 --- a/pom.xml +++ b/pom.xml @@ -306,6 +306,12 @@ --> + com.google.guava + guava + 11.0.2 + + + org.apache.solr solr-core 3.5.0 diff --git a/project/build/Aquarium.scala b/project/build/Aquarium.scala index 3c88069..0d9c7a7 100644 --- a/project/build/Aquarium.scala +++ b/project/build/Aquarium.scala @@ -132,4 +132,6 @@ class Aquarium(info: ProjectInfo) extends DefaultProject(info) { val lib_joda_conv = "org.joda" % "joda-convert" % "1.1" withSources() val lib_test = "com.novocode" % "junit-interface" % "0.7" % "test->default" + + val lib_guava = "com.google.guava" % "guava" % "11.0.2" withSources() } \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala b/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala index 301025a..12b03b0 100644 --- a/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala @@ -39,13 +39,12 @@ import gr.grnet.aquarium.actor._ import gr.grnet.aquarium.Configurator import gr.grnet.aquarium.processor.actor._ import gr.grnet.aquarium.user._ -import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent} +import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry} import java.util.Date -import gr.grnet.aquarium.util.{DateUtils, Loggable} -import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResource, DSLComplexResource} +import gr.grnet.aquarium.util.Loggable import gr.grnet.aquarium.util.date.TimeHelpers -import com.ckkloverdos.maybe.{Maybe, Failed, NoVal, Just} -import gr.grnet.aquarium.logic.accounting.{RoleAgreements, AccountingException, Policy, Accounting} +import com.ckkloverdos.maybe.{Failed, NoVal, Just} +import gr.grnet.aquarium.logic.accounting.RoleAgreements /** diff --git a/src/main/scala/gr/grnet/aquarium/user/actor/UserActorCache.scala b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorCache.scala new file mode 100644 index 0000000..bee2eec --- /dev/null +++ b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorCache.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2012 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed +* or implied, of GRNET S.A. + */ +package gr.grnet.aquarium.user.actor + +import akka.actor.ActorRef +import gr.grnet.aquarium.util.{Loggable, Lifecycle} +import com.google.common.cache._ + +/** + * An actor cache implementation using Guava. + * + * @author Georgios Gousios + */ +object UserActorCache extends Lifecycle { + + private lazy val cache : Cache[String, ActorRef] = + CacheBuilder.newBuilder() + .maximumSize(1000) + .initialCapacity(100) + .removalListener(EvictionListener) + .build() + + private[this] object EvictionListener + extends RemovalListener[String, ActorRef] with Loggable { + + def onRemoval(p1: RemovalNotification[String, ActorRef]) { + val userId = p1.getKey + val userActor = p1.getValue + + logger.debug("Parking UserActor for userId = %s".format(userId)) + UserActorSupervisor.supervisor.unlink(userActor) + // Check this is received after any currently servicing business logic message. + userActor.stop() + } + } + + def start() {} + + def stop() = cache.invalidateAll; cache.cleanUp + + def put(userId: String, userActor: ActorRef): Unit = + cache.put(userId, userActor) + + def get(userId: String): Option[ActorRef] = + cache.getIfPresent(userId) match { + case null ⇒ None + case actorRef ⇒ Some(actorRef) + } +} diff --git a/src/main/scala/gr/grnet/aquarium/user/actor/UserActorManager.scala b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorManager.scala index 8c5508a..d205a9a 100644 --- a/src/main/scala/gr/grnet/aquarium/user/actor/UserActorManager.scala +++ b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorManager.scala @@ -57,7 +57,6 @@ import gr.grnet.aquarium.processor.actor._ class UserActorManager extends AquariumActor with Loggable { // TODO: Get the constructor values from configuration - private[this] val userActorLRU = new UserActorsLRU(1000, 800) @volatile private[this] var _actorProvider: ActorProvider = _ @@ -67,19 +66,21 @@ class UserActorManager extends AquariumActor with Loggable { // create a fresh instance val userActor = _actorProvider.actorForRole(UserActorRole) userActor ! UserActorInitWithUserId(userId) + UserActorSupervisor.supervisor.link(userActor) + logger.info("New actor for userId: %s".format(userId)) userActor } private[this] def _forwardToUserActor(userId: String, m: DispatcherMessage): Unit = { logger.debug("Received %s".format(m)) - userActorLRU.get(userId) match { + UserActorCache.get(userId) match { case Some(userActor) ⇒ logger.debug("Found user actor and forwarding request %s".format(m)) userActor forward m case None ⇒ logger.debug("Not found user actor for request %s. Launching new actor".format(m)) val userActor = _launchUserActor(userId) - userActorLRU.put(userId, userActor) + UserActorCache.put(userId, userActor) logger.debug("Launched new user actor and forwarding request %s".format(m)) userActor forward m } @@ -108,6 +109,6 @@ class UserActorManager extends AquariumActor with Loggable { override def postStop = { logger.debug("Shutting down and stopping all user actors") - userActorLRU.shutdownAll() + UserActorCache.stop } } \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/user/actor/UserActorMessage.scala b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorMessage.scala index 1af0fa4..b08bb66 100644 --- a/src/main/scala/gr/grnet/aquarium/user/actor/UserActorMessage.scala +++ b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorMessage.scala @@ -36,8 +36,6 @@ package gr.grnet.aquarium.user.actor import gr.grnet.aquarium.actor.ActorMessage -import gr.grnet.aquarium.logic.accounting.dsl.DSLResource -import java.util.Date /** * Messages handled by a UserActor. diff --git a/src/main/scala/gr/grnet/aquarium/user/actor/UserActorSupervisor.scala b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorSupervisor.scala new file mode 100644 index 0000000..d6f6321 --- /dev/null +++ b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorSupervisor.scala @@ -0,0 +1,21 @@ +package gr.grnet.aquarium.user.actor + +import akka.actor.Supervisor +import akka.config.Supervision.SupervisorConfig +import akka.config.Supervision.OneForOneStrategy + +/** + * Supervisor for user actors + * + * @author Georgios Gousios + */ +object UserActorSupervisor { + + lazy val supervisor = Supervisor(SupervisorConfig( + OneForOneStrategy( + List(classOf[Exception]), //What exceptions will be handled + 50, // maximum number of restart retries + 5000 // within time in millis + ), Nil + )) +} diff --git a/src/main/scala/gr/grnet/aquarium/user/actor/UserActorsLRU.scala b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorsLRU.scala index cd2aeab..0390a2f 100644 --- a/src/main/scala/gr/grnet/aquarium/user/actor/UserActorsLRU.scala +++ b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorsLRU.scala @@ -30,9 +30,8 @@ * The views and conclusions contained in the software and * documentation are those of the authors and should not be * interpreted as representing official policies, either expressed - * or implied, of GRNET S.A. +* or implied, of GRNET S.A. */ - package gr.grnet.aquarium.user.actor import org.apache.solr.util.ConcurrentLRUCache @@ -72,10 +71,14 @@ class UserActorsLRU(val upperWaterMark: Int, val lowerWatermark: Int) extends Li } def shutdownAll() = { - val accessed = mapAsScalaMap(_cache.getLatestAccessedItems(_cache.size())) + val accessed = mapAsScalaMap(_cache.getLatestAccessedItems(_cache.size())) //Send the poison pill and make sure that all futures have been returned - val futures = accessed.keysIterator.map{x => _cache.get(x).stop()} + accessed.keysIterator.map { + x => + UserActorSupervisor.supervisor.unlink(_cache.get(x)) + _cache.get(x).stop() + } } def size: Int = _cache.size() @@ -90,6 +93,7 @@ class UserActorsLRU(val upperWaterMark: Int, val lowerWatermark: Int) extends Li private[this] object EvictionListener extends ConcurrentLRUCache.EvictionListener[String, ActorRef] with Loggable { def evictedEntry(userId: String, userActor: ActorRef): Unit = { logger.debug("Parking UserActor for userId = %s".format(userId)) + UserActorSupervisor.supervisor.unlink(userActor) // Check this is received after any currently servicing business logic message. userActor.stop() // Hopefully no need to further track these actors as they will now cause their own death. -- 1.7.10.4