From 7889024b0ec3573862ae767c0889627433f45094 Mon Sep 17 00:00:00 2001 From: Christos KK Loverdos Date: Tue, 20 Dec 2011 11:20:14 +0200 Subject: [PATCH] Evicting LRU UserActors leads to their death ultimately. --- .../processor/actor/DispatcherMessage.scala | 1 + .../gr/grnet/aquarium/user/actor/UserActor.scala | 101 +++++++++++++------- .../aquarium/user/actor/UserActorMessage.scala | 11 ++- .../grnet/aquarium/user/actor/UserActorsLRU.scala | 9 +- 4 files changed, 85 insertions(+), 37 deletions(-) diff --git a/logic/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherMessage.scala b/logic/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherMessage.scala index 9088b1a..4682928 100644 --- a/logic/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherMessage.scala +++ b/logic/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherMessage.scala @@ -36,6 +36,7 @@ package gr.grnet.aquarium.processor.actor import gr.grnet.aquarium.actor.ActorMessage +import gr.grnet.aquarium.user.actor.UserActorMessage /** * This is the base class of the messages the Dispatcher understands. diff --git a/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala b/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala index bace270..37a8841 100644 --- a/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala +++ b/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala @@ -36,9 +36,10 @@ package gr.grnet.aquarium.user.actor import gr.grnet.aquarium.user.UserState -import gr.grnet.aquarium.actor.{ActorProviderConfigured, ActorProvider, UserActorRole, AquariumActor} import gr.grnet.aquarium.util.Loggable import gr.grnet.aquarium.processor.actor.{UserResponseGetBalance, UserRequestGetBalance} +import scala.PartialFunction +import gr.grnet.aquarium.actor._ /** @@ -52,52 +53,88 @@ class UserActor extends AquariumActor with Loggable { @volatile private[this] var _isInitialized: Boolean = false @volatile + private[this] var _isParked: Boolean = false + + @volatile private[this] var _userState: UserState = _ @volatile private[this] var _actorProvider: ActorProvider = _ def role = UserActorRole - protected def receive = { + private[this] def _checkNotParked(m: ActorMessage): Boolean = { + if(_isParked) { + logger.error("UserActor %s for userId %s is parked but %s was sent to it".format(this, this._userId, m)) + false + } else { + true + } + } + + private[this] def _selfCheckToStop(): Unit = { + self ! UserActorCheckToStop + } + + protected def receive: Receive = { + case UserActorPark ⇒ + this._isParked = true + + case UserActorCheckToStop ⇒ + if(_isParked) { + self ! UserActorStop + } + + case UserActorStop ⇒ + self.stop() + case m @ UserActorInitWithUserId(userId) ⇒ - this._userId = userId - this._isInitialized = true - // TODO: query DB etc to get internal state - logger.info("Setup my userId = %s".format(userId)) + if(_checkNotParked(m)) { + this._userId = userId + this._isInitialized = true + // TODO: query DB etc to get internal state + logger.info("Setup my userId = %s".format(userId)) + } + _selfCheckToStop() case m @ ActorProviderConfigured(actorProvider) ⇒ - this._actorProvider = actorProvider - logger.info("Configured %s with %s".format(this, m)) + if(_checkNotParked(m)) { + this._actorProvider = actorProvider + logger.info("Configured %s with %s".format(this, m)) + } + _selfCheckToStop() case m @ UserRequestGetBalance(userId, timestamp) ⇒ - if(this._userId != userId) { - logger.error("Received %s but my userId = %s".format(m, this._userId)) - // TODO: throw an exception here - } else { - // This is the big party. - // Get the user state, if it exists and make sure it is not stale. - - // Do we have a user state? - if(_userState ne null) { - // Yep, we do. See what there is inside it. - val credits = _userState.credits - val creditsTimestamp = credits.snapshotTime - - // Check if data is stale - if(creditsTimestamp + 10000 > timestamp) { - // No, it's OK - self reply UserResponseGetBalance(userId, credits.data) + if(_checkNotParked(m)) { + if(this._userId != userId) { + logger.error("Received %s but my userId = %s".format(m, this._userId)) + // TODO: throw an exception here + } else { + // This is the big party. + // Get the user state, if it exists and make sure it is not stale. + + // Do we have a user state? + if(_userState ne null) { + // Yep, we do. See what there is inside it. + val credits = _userState.credits + val creditsTimestamp = credits.snapshotTime + + // Check if data is stale + if(creditsTimestamp + 10000 > timestamp) { + // No, it's OK + self reply UserResponseGetBalance(userId, credits.data) + } else { + // Yep, data is stale and must recompute balance + // FIXME: implement + logger.error("FIXME: Should have computed a new value for %s".format(credits)) + self reply UserResponseGetBalance(userId, credits.data) + } } else { - // Yep, data is stale and must recompute balance + // Nope. No user state exists. Must reproduce one // FIXME: implement - logger.error("FIXME: Should have computed a new value for %s".format(credits)) - self reply UserResponseGetBalance(userId, credits.data) + logger.error("FIXME: Should have computed the user state for userId = %s".format(userId)) } - } else { - // Nope. No user state exists. Must reproduce one - // FIXME: implement - logger.error("FIXME: Should have computed the user state for userId = %s".format(userId)) } } + _selfCheckToStop() } } \ No newline at end of file diff --git a/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActorMessage.scala b/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActorMessage.scala index a7a33df..12defad 100644 --- a/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActorMessage.scala +++ b/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActorMessage.scala @@ -35,11 +35,18 @@ package gr.grnet.aquarium.user.actor +import gr.grnet.aquarium.actor.ActorMessage + /** + * Messages handled by a UserActor. * * @author Christos KK Loverdos */ -sealed trait UserActorMessage +trait UserActorMessage extends ActorMessage + +case class UserActorInitWithUserId(userId: String) extends UserActorMessage -case class UserActorInitWithUserId(userId: String) extends UserActorMessage \ No newline at end of file +case object UserActorPark extends UserActorMessage +case object UserActorCheckToStop extends UserActorMessage +case object UserActorStop extends UserActorMessage \ No newline at end of file diff --git a/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActorsLRU.scala b/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActorsLRU.scala index 8508b0a..b69ba00 100644 --- a/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActorsLRU.scala +++ b/logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActorsLRU.scala @@ -58,7 +58,7 @@ class UserActorsLRU(val upperWaterMark: Int, val lowerWatermark: Int) extends Li true, false, EvictionListener) - + def put(userId: String, userActor: ActorRef): Unit = { _cache.put(userId, userActor) } @@ -80,8 +80,11 @@ class UserActorsLRU(val upperWaterMark: Int, val lowerWatermark: Int) extends Li } private[this] object EvictionListener extends ConcurrentLRUCache.EvictionListener[String, ActorRef] with Loggable { - def evictedEntry(userId: String, actorRef: ActorRef) = { - logger.debug("Evicting UserActor for userId = %s".format(userId)) + def evictedEntry(userId: String, userActor: ActorRef): Unit = { + logger.debug("Parking UserActor for userId = %s".format(userId)) + userActor ! UserActorPark + // hopefully no need to further track these actors as they now enter a state machine which ultimately leads + // to their shutting down } } } -- 1.7.10.4