Evicting LRU UserActors leads to their death ultimately.
authorChristos KK Loverdos <loverdos@gmail.com>
Tue, 20 Dec 2011 09:20:14 +0000 (11:20 +0200)
committerChristos KK Loverdos <loverdos@gmail.com>
Tue, 20 Dec 2011 09:20:14 +0000 (11:20 +0200)
logic/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherMessage.scala
logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala
logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActorMessage.scala
logic/src/main/scala/gr/grnet/aquarium/user/actor/UserActorsLRU.scala

index 9088b1a..4682928 100644 (file)
@@ -36,6 +36,7 @@
 package gr.grnet.aquarium.processor.actor
 
 import gr.grnet.aquarium.actor.ActorMessage
+import gr.grnet.aquarium.user.actor.UserActorMessage
 
 /**
  * This is the base class of the messages the Dispatcher understands.
index bace270..37a8841 100644 (file)
 package gr.grnet.aquarium.user.actor
 
 import gr.grnet.aquarium.user.UserState
-import gr.grnet.aquarium.actor.{ActorProviderConfigured, ActorProvider, UserActorRole, AquariumActor}
 import gr.grnet.aquarium.util.Loggable
 import gr.grnet.aquarium.processor.actor.{UserResponseGetBalance, UserRequestGetBalance}
+import scala.PartialFunction
+import gr.grnet.aquarium.actor._
 
 
 /**
@@ -52,52 +53,88 @@ class UserActor extends AquariumActor with Loggable {
   @volatile
   private[this] var _isInitialized: Boolean = false
   @volatile
+  private[this] var _isParked: Boolean = false
+
+  @volatile
   private[this] var _userState: UserState = _
   @volatile
   private[this] var _actorProvider: ActorProvider = _
 
   def role = UserActorRole
 
-  protected def receive = {
+  private[this] def _checkNotParked(m: ActorMessage): Boolean = {
+    if(_isParked) {
+      logger.error("UserActor %s for userId %s is parked but %s was sent to it".format(this, this._userId, m))
+      false
+    } else {
+      true
+    }
+  }
+
+  private[this] def _selfCheckToStop(): Unit = {
+    self ! UserActorCheckToStop
+  }
+
+  protected def receive: Receive = {
+    case UserActorPark ⇒
+      this._isParked = true
+
+    case UserActorCheckToStop ⇒
+      if(_isParked) {
+        self ! UserActorStop
+      }
+
+    case UserActorStop ⇒
+      self.stop()
+
     case m @ UserActorInitWithUserId(userId) ⇒
-      this._userId = userId
-      this._isInitialized = true
-      // TODO: query DB etc to get internal state
-      logger.info("Setup my userId = %s".format(userId))
+      if(_checkNotParked(m)) {
+        this._userId = userId
+        this._isInitialized = true
+        // TODO: query DB etc to get internal state
+        logger.info("Setup my userId = %s".format(userId))
+      }
+      _selfCheckToStop()
 
     case m @ ActorProviderConfigured(actorProvider) ⇒
-      this._actorProvider = actorProvider
-      logger.info("Configured %s with %s".format(this, m))
+      if(_checkNotParked(m)) {
+        this._actorProvider = actorProvider
+        logger.info("Configured %s with %s".format(this, m))
+      }
+      _selfCheckToStop()
 
     case m @ UserRequestGetBalance(userId, timestamp) ⇒
-      if(this._userId != userId) {
-        logger.error("Received %s but my userId = %s".format(m, this._userId))
-        // TODO: throw an exception here
-      } else {
-        // This is the big party.
-        // Get the user state, if it exists and make sure it is not stale.
-
-        // Do we have a user state?
-        if(_userState ne null) {
-          // Yep, we do. See what there is inside it.
-          val credits = _userState.credits
-          val creditsTimestamp = credits.snapshotTime
-
-          // Check if data is stale
-          if(creditsTimestamp + 10000 > timestamp) {
-            // No, it's OK
-            self reply UserResponseGetBalance(userId, credits.data)
+      if(_checkNotParked(m)) {
+        if(this._userId != userId) {
+          logger.error("Received %s but my userId = %s".format(m, this._userId))
+          // TODO: throw an exception here
+        } else {
+          // This is the big party.
+          // Get the user state, if it exists and make sure it is not stale.
+
+          // Do we have a user state?
+          if(_userState ne null) {
+            // Yep, we do. See what there is inside it.
+            val credits = _userState.credits
+            val creditsTimestamp = credits.snapshotTime
+
+            // Check if data is stale
+            if(creditsTimestamp + 10000 > timestamp) {
+              // No, it's OK
+              self reply UserResponseGetBalance(userId, credits.data)
+            } else {
+              // Yep, data is stale and must recompute balance
+              // FIXME: implement
+              logger.error("FIXME: Should have computed a new value for %s".format(credits))
+              self reply UserResponseGetBalance(userId, credits.data)
+            }
           } else {
-            // Yep, data is stale and must recompute balance
+            // Nope. No user state exists. Must reproduce one
             // FIXME: implement
-            logger.error("FIXME: Should have computed a new value for %s".format(credits))
-            self reply UserResponseGetBalance(userId, credits.data)
+            logger.error("FIXME: Should have computed the user state for userId = %s".format(userId))
           }
-        } else {
-          // Nope. No user state exists. Must reproduce one
-          // FIXME: implement
-          logger.error("FIXME: Should have computed the user state for userId = %s".format(userId))
         }
       }
+      _selfCheckToStop()
   }
 }
\ No newline at end of file
index a7a33df..12defad 100644 (file)
 
 package gr.grnet.aquarium.user.actor
 
+import gr.grnet.aquarium.actor.ActorMessage
+
 /**
+ * Messages handled by a UserActor.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-sealed trait UserActorMessage
+trait UserActorMessage extends ActorMessage
+
+case class UserActorInitWithUserId(userId: String) extends UserActorMessage
 
-case class UserActorInitWithUserId(userId: String) extends UserActorMessage
\ No newline at end of file
+case object UserActorPark extends UserActorMessage
+case object UserActorCheckToStop extends UserActorMessage
+case object UserActorStop extends UserActorMessage
\ No newline at end of file
index 8508b0a..b69ba00 100644 (file)
@@ -58,7 +58,7 @@ class UserActorsLRU(val upperWaterMark: Int, val lowerWatermark: Int) extends Li
     true,
     false,
     EvictionListener)
-  
+
   def put(userId: String, userActor: ActorRef): Unit = {
     _cache.put(userId, userActor)
   }
@@ -80,8 +80,11 @@ class UserActorsLRU(val upperWaterMark: Int, val lowerWatermark: Int) extends Li
   }
   
   private[this] object EvictionListener extends ConcurrentLRUCache.EvictionListener[String, ActorRef] with Loggable {
-    def evictedEntry(userId: String, actorRef: ActorRef) = {
-      logger.debug("Evicting UserActor for userId = %s".format(userId))
+    def evictedEntry(userId: String, userActor: ActorRef): Unit = {
+      logger.debug("Parking UserActor for userId = %s".format(userId))
+      userActor ! UserActorPark
+      // hopefully no need to further track these actors as they now enter a state machine which ultimately leads
+      // to their shutting down
     }
   }
 }