Forward UserState events to dispatcher
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 9 Jan 2012 12:47:40 +0000 (14:47 +0200)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 9 Jan 2012 12:47:57 +0000 (14:47 +0200)
src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherActor.scala
src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherMessage.scala
src/main/scala/gr/grnet/aquarium/processor/actor/EventProcessorService.scala
src/main/scala/gr/grnet/aquarium/processor/actor/UserEventProcessorService.scala

index bceab01..0ab611d 100644 (file)
@@ -49,30 +49,29 @@ class DispatcherActor extends AquariumActor with Loggable {
 
   def role = DispatcherRole
 
+  private[this] def _forwardToUserManager(m: DispatcherMessage): Unit = {
+    logger.debug("Received %s".format(m))
+    val userActorManager = _actorProvider.actorForRole(UserActorManagerRole)
+    // forward to the user actor manager, which in turn will
+    // forward to the appropriate user actor (and create one if it does not exist)
+    userActorManager forward m
+  }
+
   protected def receive = {
     case ActorProviderConfigured(actorProvider) ⇒
       this._actorProvider = actorProvider
       logger.info("Received actorProvider = %s".format(this._actorProvider))
 
     case m @ UserRequestGetBalance(userId, timestamp) ⇒
-      logger.debug("Received %s".format(m))
-      val userActorManager = _actorProvider.actorForRole(UserActorManagerRole)
-      // forward to the user actor manager, which in turn will
-      // forward to the appropriate user actor (and create one if it does not exist)
-      userActorManager forward m
+      _forwardToUserManager(m)
 
     case m @ UserRequestGetState(userId, timestamp) ⇒
-      logger.debug("Received %s".format(m))
-      val userActorManager = _actorProvider.actorForRole(UserActorManagerRole)
-      // forward to the user actor manager, which in turn will
-      // forward to the appropriate user actor (and create one if it does not exist)
-      userActorManager forward m
+      _forwardToUserManager(m)
 
     case m @ ProcessResourceEvent(resourceEvent) ⇒
-      logger.debug("Received %s".format(m))
-      val userActorManager = _actorProvider.actorForRole(UserActorManagerRole)
-      // forward to the user actor manager, which in turn will
-      // forward to the appropriate user actor (and create one if it does not exist)
-      userActorManager forward m
+      _forwardToUserManager(m)
+
+    case m @ ProcessUserEvent(userEvent) ⇒
+      _forwardToUserManager(m)
   }
 }
\ No newline at end of file
index 035cab0..ea63da7 100644 (file)
@@ -38,7 +38,7 @@ package gr.grnet.aquarium.processor.actor
 import gr.grnet.aquarium.actor.ActorMessage
 import gr.grnet.aquarium.user.UserState
 import gr.grnet.aquarium.util.json.{JsonSupport, JsonHelpers}
-import gr.grnet.aquarium.logic.events.ResourceEvent
+import gr.grnet.aquarium.logic.events.{UserEvent, ResourceEvent}
 
 /**
  * This is the base class of the messages the Dispatcher understands.
@@ -73,4 +73,5 @@ case class UserResponseGetState(userId: String, state: UserState) extends Dispat
 }
 
 case class ProcessResourceEvent(rce: ResourceEvent) extends DispatcherMessage
+case class ProcessUserEvent(ue: UserEvent) extends DispatcherMessage
 
index 0598160..9e35681 100644 (file)
@@ -90,7 +90,7 @@ abstract class EventProcessorService[E <: AquariumEvent] extends AkkaAMQP with L
   protected def _configurator: Configurator = Configurator.MasterConfigurator
 
   protected def decode(data: Array[Byte]): E
-  protected def forward(resourceEvent: E): Unit
+  protected def forward(event: E): Unit
   protected def exists(event: E): Boolean
   protected def persist(event: E): Boolean
 
index e9637d8..b40178b 100644 (file)
@@ -3,6 +3,7 @@ package gr.grnet.aquarium.processor.actor
 import gr.grnet.aquarium.messaging.MessagingNames
 import gr.grnet.aquarium.logic.events.{UserEvent, AquariumEvent}
 import com.ckkloverdos.maybe.{NoVal, Failed, Just}
+import gr.grnet.aquarium.actor.DispatcherRole
 
 
 /**
@@ -14,7 +15,10 @@ class UserEventProcessorService extends EventProcessorService[UserEvent] {
 
   override def decode(data: Array[Byte]) = UserEvent.fromBytes(data)
 
-  override def forward(resourceEvent: UserEvent) {}
+  override def forward(event: UserEvent) = {
+    val dispatcher = _configurator.actorProvider.actorForRole(DispatcherRole)
+    dispatcher ! ProcessUserEvent(event)
+  }
 
   override def exists(event: UserEvent) =
     _configurator.userEventStore.findUserEventById(event.id).isJust