Dispatcher -> Router (II)
authorChristos KK Loverdos <loverdos@gmail.com>
Wed, 25 Apr 2012 05:35:17 +0000 (08:35 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Wed, 25 Apr 2012 05:35:17 +0000 (08:35 +0300)
src/main/scala/gr/grnet/aquarium/actor/ActorRole.scala
src/main/scala/gr/grnet/aquarium/actor/message/service/router/RouterMessage.scala [moved from src/main/scala/gr/grnet/aquarium/actor/message/service/dispatcher/DispatcherMessage.scala with 85% similarity]
src/main/scala/gr/grnet/aquarium/actor/service/pinger/PingerActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala [moved from src/main/scala/gr/grnet/aquarium/actor/service/dispatcher/RouterActor.scala with 72% similarity]
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActorManager.scala
src/main/scala/gr/grnet/aquarium/service/IMEventProcessorService.scala
src/main/scala/gr/grnet/aquarium/service/ResourceEventProcessorService.scala

index 9f46ec9..5ff2849 100644 (file)
@@ -35,8 +35,8 @@
 package gr.grnet.aquarium.actor
 
 import message.config.user.UserActorInitWithUserId
-import message.service.dispatcher._
-import service.dispatcher.RouterActor
+import message.service.router._
+import service.router.RouterActor
 import service.pinger.PingerActor
 import service.rest.RESTActor
 import service.user.{UserActorManager, UserActor}
@@ -91,7 +91,7 @@ case object PingerRole
                       Set(classOf[AdminRequestPingAll]))
 
 /**
- * The generic router/dispatcher.
+ * The generic router.
  */
 case object RouterRole
     extends ActorRole("RouterRole",
@@ -35,7 +35,7 @@
 
 package gr.grnet.aquarium.actor.message
 package service
-package dispatcher
+package router
 
 import gr.grnet.aquarium.user.UserState
 import gr.grnet.aquarium.util.json.JsonSupport
@@ -45,15 +45,15 @@ import gr.grnet.aquarium.converter.{PrettyJsonTextFormat, StdConverters}
 
 
 /**
- * This is the base class of the messages the dispatcher understands.
+ * This is the base class of the messages the [[gr.grnet.aquarium.actor.service.router.RouterActor]] understands.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>.
  */
-sealed trait DispatcherMessage extends ActorMessage {
+sealed trait RouterMessage extends ActorMessage {
   def isError: Boolean = false
 }
 
-sealed trait DispatcherResponseMessage extends DispatcherMessage {
+sealed trait RouterResponseMessage extends RouterMessage {
   def error: Option[String]
 
   override def isError = error.isDefined
@@ -69,19 +69,19 @@ sealed trait DispatcherResponseMessage extends DispatcherMessage {
   }
 }
 
-case class RequestUserBalance(userId: String, timestamp: Long) extends DispatcherMessage
+case class RequestUserBalance(userId: String, timestamp: Long) extends RouterMessage
 case class BalanceValue(balance: Double) extends JsonSupport
-case class ResponseUserBalance(userId: String, balance: Double, error: Option[String]) extends DispatcherResponseMessage {
+case class ResponseUserBalance(userId: String, balance: Double, error: Option[String]) extends RouterResponseMessage {
   def responseBody = BalanceValue(balance)
 }
 
-case class UserResponseGetBalance(userId: String, balance: Double) extends DispatcherResponseMessage {
+case class UserResponseGetBalance(userId: String, balance: Double) extends RouterResponseMessage {
   def responseBody = BalanceValue(balance)
   def error = None
 }
 
-case class UserRequestGetState(userId: String, timestamp: Long) extends DispatcherMessage
-case class UserResponseGetState(userId: String, state: UserState) extends DispatcherResponseMessage {
+case class UserRequestGetState(userId: String, timestamp: Long) extends RouterMessage
+case class UserResponseGetState(userId: String, state: UserState) extends RouterResponseMessage {
   def responseBody = state
   val error = None
 }
@@ -91,14 +91,14 @@ case class UserResponseGetState(userId: String, state: UserState) extends Dispat
  *
  * Note that the prefix `Process` means that no reply is created or needed.
  */
-case class ProcessResourceEvent(rcEvent: ResourceEvent) extends DispatcherMessage
+case class ProcessResourceEvent(rcEvent: ResourceEvent) extends RouterMessage
 
 /**
  * Dispatcher message that triggers the user event processing pipeline.
  *
  * Note that the prefix `Process` means that no reply is created or needed.
  */
-case class ProcessIMEvent(imEvent: IMEventModel) extends DispatcherMessage
+case class ProcessIMEvent(imEvent: IMEventModel) extends RouterMessage
 
 
-case class AdminRequestPingAll() extends DispatcherMessage
+case class AdminRequestPingAll() extends RouterMessage
index f10fa8f..2cd8a83 100644 (file)
@@ -39,7 +39,7 @@ package pinger
 
 
 import gr.grnet.aquarium.actor.{ReflectiveAquariumActor, PingerRole}
-import message.service.dispatcher.AdminRequestPingAll
+import message.service.router.AdminRequestPingAll
 
 
 /**
index 2f2715b..aec8865 100644 (file)
@@ -47,7 +47,7 @@ import akka.actor.Actor
 import gr.grnet.aquarium.actor.{RESTRole, AquariumActor, RouterRole}
 import RESTPaths.{UserBalancePath, UserStatePath, AdminPingAll}
 import com.ckkloverdos.maybe.{NoVal, Just}
-import message.service.dispatcher._
+import message.service.router._
 import gr.grnet.aquarium.util.date.TimeHelpers
 import org.joda.time.format.ISODateTimeFormat
 
@@ -148,7 +148,7 @@ class RESTActor(_id: String) extends AquariumActor with Loggable {
 
 
   private[this]
-  def callDispatcher(message: DispatcherMessage, responder: RequestResponder): Unit = {
+  def callDispatcher(message: RouterMessage, responder: RequestResponder): Unit = {
     val configurator = Configurator.MasterConfigurator
     val actorProvider = configurator.actorProvider
     val dispatcher = actorProvider.actorForRole(DispatcherRole)
@@ -166,7 +166,7 @@ class RESTActor(_id: String) extends AquariumActor with Loggable {
 
           case Some(Right(actualResponse)) ⇒
             actualResponse match {
-              case dispatcherResponse: DispatcherResponseMessage if (!dispatcherResponse.isError) ⇒
+              case dispatcherResponse: RouterResponseMessage if (!dispatcherResponse.isError) ⇒
                 //logger.debug("Received response: %s".format(dispatcherResponse))
                 //logger.debug("Received response (JSON): %s".format(dispatcherResponse.toJson))
                 //logger.debug("Received response:body %s".format(dispatcherResponse.responseBody))
@@ -177,7 +177,7 @@ class RESTActor(_id: String) extends AquariumActor with Loggable {
                     body = dispatcherResponse.responseBodyToJson.getBytes("UTF-8"),
                     headers = HttpHeader("Content-type", "application/json;charset=utf-8") :: Nil))
 
-              case dispatcherResponse: DispatcherResponseMessage ⇒
+              case dispatcherResponse: RouterResponseMessage ⇒
                 logger.error("Error serving %s: Dispatcher response is: %s".format(message, actualResponse))
                 responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
 
 
 package gr.grnet.aquarium.actor
 package service
-package dispatcher
+package router
 
 import gr.grnet.aquarium.util.Loggable
 import gr.grnet.aquarium.service.ActorProviderService
-import message.config.ActorProviderConfigured
-import message.service.dispatcher._
+import message.service.router._
 import akka.actor.ActorRef
 import message.config.user.UserActorInitWithUserId
 import user.{UserActorCache, UserActorSupervisor}
+import message.config.{AquariumPropertiesLoaded, ActorProviderConfigured}
 
 /**
- * Business logic dispatcher. Incoming messages are dispatched to appropriate destinations.
+ * Business logic router. Incoming messages are routed to appropriate destinations. Replies are routed back
+ * appropriately.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>.
  */
-class RouterActor extends AquariumActor with Loggable {
+class RouterActor extends ReflectiveAquariumActor {
   private[this] var _actorProvider: ActorProviderService = _
 
   def role = RouterRole
 
-  private[this] def _forwardToUserManager(m: DispatcherMessage): Unit = {
+  private[this] def _forwardToUserManager(m: RouterMessage): Unit = {
     logger.debug("Received %s".format(m))
     val userActorManager = _actorProvider.actorForRole(UserActorManagerRole)
     // forward to the user actor manager, which in turn will
@@ -72,7 +73,7 @@ class RouterActor extends AquariumActor with Loggable {
     userActor
   }
 
-  private[this] def _forwardToUserActor(userId: String, m: DispatcherMessage): Unit = {
+  private[this] def _forwardToUserActor(userId: String, m: RouterMessage): Unit = {
     logger.debug("Received %s".format(m))
     UserActorCache.get(userId) match {
       case Some(userActor) ⇒
@@ -87,23 +88,35 @@ class RouterActor extends AquariumActor with Loggable {
     }
   }
 
-  protected def receive = {
-    case ActorProviderConfigured(actorProvider) ⇒
-      this._actorProvider = actorProvider
-      logger.info("Received actorProvider = %s".format(this._actorProvider))
+  def onAquariumPropertiesLoaded(m: AquariumPropertiesLoaded): Unit = {
+  }
+
+  def onActorProviderConfigured(m: ActorProviderConfigured): Unit = {
+    this._actorProvider = m.actorProvider
+    logger.info("Configured %s with %s".format(this, m))
+  }
+
+  def onRequestUserBalance(m: RequestUserBalance): Unit = {
+    _forwardToUserActor(m.userId, m)
+  }
 
-    case m@RequestUserBalance(userId, timestamp) ⇒
-      _forwardToUserManager(m)
+  def onUserRequestGetState(m: UserRequestGetState): Unit = {
+    _forwardToUserActor(m.userId, m)
+  }
 
-    case m@UserRequestGetState(userId, timestamp) ⇒
-      _forwardToUserManager(m)
+  def onProcessResourceEvent(m: ProcessResourceEvent): Unit = {
+    _forwardToUserActor(m.rcEvent.userID, m)
+  }
 
-    case m@ProcessResourceEvent(resourceEvent) ⇒
-      _forwardToUserManager(m)
+  def onProcessIMEvent(m: ProcessIMEvent): Unit = {
+    _forwardToUserActor(m.imEvent.userID, m)
+  }
 
-    case m@ProcessIMEvent(userEvent) ⇒
-      _forwardToUserManager(m)
+  def onAdminRequestPingAll(m: AdminRequestPingAll): Unit = {
+
+  }
 
-    case m@AdminRequestPingAll ⇒
+  override def postStop = {
+    UserActorCache.stop
   }
 }
\ No newline at end of file
index c52f32d..ac9be8a 100644 (file)
@@ -49,7 +49,7 @@ import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.logic.accounting.RoleAgreements
 import gr.grnet.aquarium.messaging.AkkaAMQP
 import gr.grnet.aquarium.actor.message.config.user.UserActorInitWithUserId
-import gr.grnet.aquarium.actor.message.service.dispatcher._
+import gr.grnet.aquarium.actor.message.service.router._
 import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
 import gr.grnet.aquarium.event.im.IMEventModel
 import gr.grnet.aquarium.event.{WalletEntry}
index 153bcd6..5708553 100644 (file)
@@ -41,7 +41,7 @@ import akka.actor.ActorRef
 import gr.grnet.aquarium.actor._
 import message.config.user.UserActorInitWithUserId
 import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
-import message.service.dispatcher._
+import message.service.router._
 import gr.grnet.aquarium.service.ActorProviderService
 
 
@@ -74,7 +74,7 @@ class UserActorManager extends ReflectiveAquariumActor {
     userActor
   }
 
-  private[this] def _forwardToUserActor(userId: String, m: DispatcherMessage): Unit = {
+  private[this] def _forwardToUserActor(userId: String, m: RouterMessage): Unit = {
     logger.debug("Received %s".format(m))
     UserActorCache.get(userId) match {
       case Some(userActor) ⇒
index 558904f..c153504 100644 (file)
@@ -39,7 +39,7 @@ package gr.grnet.aquarium.service
 import gr.grnet.aquarium.actor.RouterRole
 import gr.grnet.aquarium.Configurator.Keys
 import gr.grnet.aquarium.store.LocalFSEventStore
-import gr.grnet.aquarium.actor.message.service.dispatcher.ProcessIMEvent
+import gr.grnet.aquarium.actor.message.service.router.ProcessIMEvent
 import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.util.makeString
 import com.ckkloverdos.maybe._
index 2185694..1c3db72 100644 (file)
@@ -39,7 +39,7 @@ import gr.grnet.aquarium.actor.RouterRole
 import gr.grnet.aquarium.Configurator.Keys
 import gr.grnet.aquarium.store.LocalFSEventStore
 import com.ckkloverdos.maybe.{Maybe, Just, Failed, NoVal}
-import gr.grnet.aquarium.actor.message.service.dispatcher.ProcessResourceEvent
+import gr.grnet.aquarium.actor.message.service.router.ProcessResourceEvent
 import gr.grnet.aquarium.event.ResourceEvent
 import gr.grnet.aquarium.util.date.TimeHelpers