package gr.grnet.aquarium.actor
import message.config.user.UserActorInitWithUserId
-import message.service.dispatcher._
-import service.dispatcher.RouterActor
+import message.service.router._
+import service.router.RouterActor
import service.pinger.PingerActor
import service.rest.RESTActor
import service.user.{UserActorManager, UserActor}
Set(classOf[AdminRequestPingAll]))
/**
- * The generic router/dispatcher.
+ * The generic router.
*/
case object RouterRole
extends ActorRole("RouterRole",
package gr.grnet.aquarium.actor.message
package service
-package dispatcher
+package router
import gr.grnet.aquarium.user.UserState
import gr.grnet.aquarium.util.json.JsonSupport
/**
- * This is the base class of the messages the dispatcher understands.
+ * This is the base class of the messages the [[gr.grnet.aquarium.actor.service.router.RouterActor]] understands.
*
* @author Christos KK Loverdos <loverdos@gmail.com>.
*/
-sealed trait DispatcherMessage extends ActorMessage {
+sealed trait RouterMessage extends ActorMessage {
def isError: Boolean = false
}
-sealed trait DispatcherResponseMessage extends DispatcherMessage {
+sealed trait RouterResponseMessage extends RouterMessage {
def error: Option[String]
override def isError = error.isDefined
}
}
-case class RequestUserBalance(userId: String, timestamp: Long) extends DispatcherMessage
+case class RequestUserBalance(userId: String, timestamp: Long) extends RouterMessage
case class BalanceValue(balance: Double) extends JsonSupport
-case class ResponseUserBalance(userId: String, balance: Double, error: Option[String]) extends DispatcherResponseMessage {
+case class ResponseUserBalance(userId: String, balance: Double, error: Option[String]) extends RouterResponseMessage {
def responseBody = BalanceValue(balance)
}
-case class UserResponseGetBalance(userId: String, balance: Double) extends DispatcherResponseMessage {
+case class UserResponseGetBalance(userId: String, balance: Double) extends RouterResponseMessage {
def responseBody = BalanceValue(balance)
def error = None
}
-case class UserRequestGetState(userId: String, timestamp: Long) extends DispatcherMessage
-case class UserResponseGetState(userId: String, state: UserState) extends DispatcherResponseMessage {
+case class UserRequestGetState(userId: String, timestamp: Long) extends RouterMessage
+case class UserResponseGetState(userId: String, state: UserState) extends RouterResponseMessage {
def responseBody = state
val error = None
}
*
* Note that the prefix `Process` means that no reply is created or needed.
*/
-case class ProcessResourceEvent(rcEvent: ResourceEvent) extends DispatcherMessage
+case class ProcessResourceEvent(rcEvent: ResourceEvent) extends RouterMessage
/**
* Dispatcher message that triggers the user event processing pipeline.
*
* Note that the prefix `Process` means that no reply is created or needed.
*/
-case class ProcessIMEvent(imEvent: IMEventModel) extends DispatcherMessage
+case class ProcessIMEvent(imEvent: IMEventModel) extends RouterMessage
-case class AdminRequestPingAll() extends DispatcherMessage
+case class AdminRequestPingAll() extends RouterMessage
import gr.grnet.aquarium.actor.{ReflectiveAquariumActor, PingerRole}
-import message.service.dispatcher.AdminRequestPingAll
+import message.service.router.AdminRequestPingAll
/**
import gr.grnet.aquarium.actor.{RESTRole, AquariumActor, RouterRole}
import RESTPaths.{UserBalancePath, UserStatePath, AdminPingAll}
import com.ckkloverdos.maybe.{NoVal, Just}
-import message.service.dispatcher._
+import message.service.router._
import gr.grnet.aquarium.util.date.TimeHelpers
import org.joda.time.format.ISODateTimeFormat
private[this]
- def callDispatcher(message: DispatcherMessage, responder: RequestResponder): Unit = {
+ def callDispatcher(message: RouterMessage, responder: RequestResponder): Unit = {
val configurator = Configurator.MasterConfigurator
val actorProvider = configurator.actorProvider
val dispatcher = actorProvider.actorForRole(DispatcherRole)
case Some(Right(actualResponse)) ⇒
actualResponse match {
- case dispatcherResponse: DispatcherResponseMessage if (!dispatcherResponse.isError) ⇒
+ case dispatcherResponse: RouterResponseMessage if (!dispatcherResponse.isError) ⇒
//logger.debug("Received response: %s".format(dispatcherResponse))
//logger.debug("Received response (JSON): %s".format(dispatcherResponse.toJson))
//logger.debug("Received response:body %s".format(dispatcherResponse.responseBody))
body = dispatcherResponse.responseBodyToJson.getBytes("UTF-8"),
headers = HttpHeader("Content-type", "application/json;charset=utf-8") :: Nil))
- case dispatcherResponse: DispatcherResponseMessage ⇒
+ case dispatcherResponse: RouterResponseMessage ⇒
logger.error("Error serving %s: Dispatcher response is: %s".format(message, actualResponse))
responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
package gr.grnet.aquarium.actor
package service
-package dispatcher
+package router
import gr.grnet.aquarium.util.Loggable
import gr.grnet.aquarium.service.ActorProviderService
-import message.config.ActorProviderConfigured
-import message.service.dispatcher._
+import message.service.router._
import akka.actor.ActorRef
import message.config.user.UserActorInitWithUserId
import user.{UserActorCache, UserActorSupervisor}
+import message.config.{AquariumPropertiesLoaded, ActorProviderConfigured}
/**
- * Business logic dispatcher. Incoming messages are dispatched to appropriate destinations.
+ * Business logic router. Incoming messages are routed to appropriate destinations. Replies are routed back
+ * appropriately.
*
* @author Christos KK Loverdos <loverdos@gmail.com>.
*/
-class RouterActor extends AquariumActor with Loggable {
+class RouterActor extends ReflectiveAquariumActor {
private[this] var _actorProvider: ActorProviderService = _
def role = RouterRole
- private[this] def _forwardToUserManager(m: DispatcherMessage): Unit = {
+ private[this] def _forwardToUserManager(m: RouterMessage): Unit = {
logger.debug("Received %s".format(m))
val userActorManager = _actorProvider.actorForRole(UserActorManagerRole)
// forward to the user actor manager, which in turn will
userActor
}
- private[this] def _forwardToUserActor(userId: String, m: DispatcherMessage): Unit = {
+ private[this] def _forwardToUserActor(userId: String, m: RouterMessage): Unit = {
logger.debug("Received %s".format(m))
UserActorCache.get(userId) match {
case Some(userActor) ⇒
}
}
- protected def receive = {
- case ActorProviderConfigured(actorProvider) ⇒
- this._actorProvider = actorProvider
- logger.info("Received actorProvider = %s".format(this._actorProvider))
+ def onAquariumPropertiesLoaded(m: AquariumPropertiesLoaded): Unit = {
+ }
+
+ def onActorProviderConfigured(m: ActorProviderConfigured): Unit = {
+ this._actorProvider = m.actorProvider
+ logger.info("Configured %s with %s".format(this, m))
+ }
+
+ def onRequestUserBalance(m: RequestUserBalance): Unit = {
+ _forwardToUserActor(m.userId, m)
+ }
- case m@RequestUserBalance(userId, timestamp) ⇒
- _forwardToUserManager(m)
+ def onUserRequestGetState(m: UserRequestGetState): Unit = {
+ _forwardToUserActor(m.userId, m)
+ }
- case m@UserRequestGetState(userId, timestamp) ⇒
- _forwardToUserManager(m)
+ def onProcessResourceEvent(m: ProcessResourceEvent): Unit = {
+ _forwardToUserActor(m.rcEvent.userID, m)
+ }
- case m@ProcessResourceEvent(resourceEvent) ⇒
- _forwardToUserManager(m)
+ def onProcessIMEvent(m: ProcessIMEvent): Unit = {
+ _forwardToUserActor(m.imEvent.userID, m)
+ }
- case m@ProcessIMEvent(userEvent) ⇒
- _forwardToUserManager(m)
+ def onAdminRequestPingAll(m: AdminRequestPingAll): Unit = {
+
+ }
- case m@AdminRequestPingAll ⇒
+ override def postStop = {
+ UserActorCache.stop
}
}
\ No newline at end of file
import gr.grnet.aquarium.logic.accounting.RoleAgreements
import gr.grnet.aquarium.messaging.AkkaAMQP
import gr.grnet.aquarium.actor.message.config.user.UserActorInitWithUserId
-import gr.grnet.aquarium.actor.message.service.dispatcher._
+import gr.grnet.aquarium.actor.message.service.router._
import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
import gr.grnet.aquarium.event.im.IMEventModel
import gr.grnet.aquarium.event.{WalletEntry}
import gr.grnet.aquarium.actor._
import message.config.user.UserActorInitWithUserId
import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
-import message.service.dispatcher._
+import message.service.router._
import gr.grnet.aquarium.service.ActorProviderService
userActor
}
- private[this] def _forwardToUserActor(userId: String, m: DispatcherMessage): Unit = {
+ private[this] def _forwardToUserActor(userId: String, m: RouterMessage): Unit = {
logger.debug("Received %s".format(m))
UserActorCache.get(userId) match {
case Some(userActor) ⇒
import gr.grnet.aquarium.actor.RouterRole
import gr.grnet.aquarium.Configurator.Keys
import gr.grnet.aquarium.store.LocalFSEventStore
-import gr.grnet.aquarium.actor.message.service.dispatcher.ProcessIMEvent
+import gr.grnet.aquarium.actor.message.service.router.ProcessIMEvent
import gr.grnet.aquarium.util.date.TimeHelpers
import gr.grnet.aquarium.util.makeString
import com.ckkloverdos.maybe._
import gr.grnet.aquarium.Configurator.Keys
import gr.grnet.aquarium.store.LocalFSEventStore
import com.ckkloverdos.maybe.{Maybe, Just, Failed, NoVal}
-import gr.grnet.aquarium.actor.message.service.dispatcher.ProcessResourceEvent
+import gr.grnet.aquarium.actor.message.service.router.ProcessResourceEvent
import gr.grnet.aquarium.event.ResourceEvent
import gr.grnet.aquarium.util.date.TimeHelpers