WIP: IMEventModel end-to-end chain
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
index 2facf1f..67c4abb 100644 (file)
@@ -41,12 +41,12 @@ import gr.grnet.aquarium.actor._
 import gr.grnet.aquarium.user._
 
 import gr.grnet.aquarium.util.shortClassNameOf
-import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.actor.message.service.router._
 import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
 import gr.grnet.aquarium.event.im.IMEventModel
 import akka.config.Supervision.Temporary
 import gr.grnet.aquarium.{AquariumInternalError, AquariumException, Configurator}
+import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
 
 
 /**
@@ -55,14 +55,15 @@ import gr.grnet.aquarium.{AquariumInternalError, AquariumException, Configurator
  */
 
 class UserActor extends ReflectiveRoleableActor {
-  private[this] var _userID: String = _
+  private[this] var _imState: IMStateSnapshot = _
   private[this] var _userState: UserState = _
 
   self.lifeCycle = Temporary
 
+  private[this] def _userID = this._userState.userID
   private[this] def _shutmedown(): Unit = {
-    if(_haveFullState) {
-      UserActorCache.invalidate(this._userID)
+    if(_haveUserState) {
+      UserActorCache.invalidate(_userID)
     }
 
     self.stop()
@@ -78,21 +79,19 @@ class UserActor extends ReflectiveRoleableActor {
   def role = UserActorRole
 
   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
-//  private[this] def _userId = _userState.userId
 
   private[this] def _timestampTheshold =
     _configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
 
 
-  private[this] def _haveFullState = {
-    (this._userID ne null) && (this._userState ne null)
+  private[this] def _haveUserState = {
+    this._userState ne null
   }
 
-  private[this] def _havePartialState = {
-    (this._userID ne null) && (this._userState eq null)
+  private[this] def _haveIMState = {
+    this._imState ne null
   }
 
-
   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
   }
 
@@ -104,103 +103,30 @@ class UserActor extends ReflectiveRoleableActor {
     "default"
   }
 
-  private[this] def processCreateUser(imEvent: IMEventModel): Unit = {
-    this._userID = imEvent.userID
-
-    val store = _configurator.storeProvider.userStateStore
-    // try find user state. normally should ot exist
-    val latestUserStateOpt = store.findLatestUserStateByUserID(this._userID)
-    if(latestUserStateOpt.isDefined) {
-      logger.error("Got %s(%s, %s) but user already exists. Ingoring".format(
-        this._userID,
-        shortClassNameOf(imEvent),
-        imEvent.eventType))
-
-      return
-    }
-
-    val initialAgreementName = _getAgreementNameForNewUser(imEvent)
-    val newUserState    = DefaultUserStateComputations.createInitialUserState(
-      this._userID,
-      imEvent.occurredMillis,
-      imEvent.isActive,
-      0.0,
-      List(imEvent.role),
-      initialAgreementName)
-
-    this._userState = newUserState
-
-    // FIXME: If this fails, then the actor must be shut down.
-    store.insertUserState(newUserState)
-  }
-
-  private[this] def processModifyUser(imEvent: IMEventModel): Unit = {
-    val now = TimeHelpers.nowMillis()
-
-    if(!_haveFullState) {
-      ERROR("Got %s(%s) but have no state. Shutting down", shortClassNameOf(imEvent), imEvent.eventType)
-      _shutmedown()
-      return
-    }
-
-    this._userState = this._userState.modifyFromIMEvent(imEvent, now)
-  }
-
-  def onProcessSetUserID(event: ProcessSetUserID): Unit = {
-    this._userID = event.userID
-  }
-
   def onProcessIMEvent(event: ProcessIMEvent): Unit = {
     val now = TimeHelpers.nowMillis()
 
     val imEvent = event.imEvent
-    // If we already have a userID but it does not match the incoming userID, then this is an internal error
-    if(_havePartialState && (this._userID != imEvent.userID)) {
-      throw new AquariumInternalError(
-        "Got userID = %s but already have userID = %s".format(imEvent.userID, this._userID))
-    }
-
-    // If we get an IMEvent without having a user state, then we query for the latest user state.
-    if(!_haveFullState) {
-      val userStateOpt = _configurator.userStateStore.findLatestUserStateByUserID(this._userID)
-      this._userState = userStateOpt match {
-        case Some(userState) ⇒
-          userState
-
-        case None ⇒
-          val initialAgreementName = _getAgreementNameForNewUser(imEvent)
-          val initialUserState = DefaultUserStateComputations.createInitialUserState(
-            this._userID,
-            imEvent.occurredMillis,
-            imEvent.isActive,
-            0.0,
-            List(imEvent.role),
-            initialAgreementName)
-
-          DEBUG("Got initial state")
-          initialUserState
-      }
-    }
+    val isUpdate = if(_haveIMState) {
+      val newOccurredMillis = imEvent.occurredMillis
+      val currentOccurredMillis = this._imState.imEvent.occurredMillis
 
-    if(imEvent.isModifyUser && this._userState.isInitial) {
-      INFO("Got a '%s' but have not received '%s' yet", imEvent.eventType, IMEventModel.EventTypeNames.create)
-      return
-    }
-
-    if(imEvent.isCreateUser && !this._userState.isInitial) {
-      INFO("Got a '%s' but my state is not initial", imEvent.eventType)
-      return
-    }
+      if(newOccurredMillis < currentOccurredMillis) {
+        INFO(
+          "Ignoring older IMEvent: [%s] < [%s]",
+          new MutableDateCalc(newOccurredMillis).toYYYYMMDDHHMMSSSSS,
+          new MutableDateCalc(currentOccurredMillis).toYYYYMMDDHHMMSSSSS)
 
-    this._userState = this._userState.modifyFromIMEvent(imEvent, now)
+        return
+      }
 
-    if(imEvent.isCreateUser) {
-      processCreateUser(imEvent)
-    } else if(imEvent.isModifyUser) {
-      processModifyUser(imEvent)
+      true
     } else {
-      throw new AquariumException("Cannot interpret %s".format(imEvent))
+      false
     }
+
+    this._imState = IMStateSnapshot(imEvent, now)
+    DEBUG("%s %s", if(isUpdate) "Update" else "Set", shortClassNameOf(this._imState))
   }
 
   def onRequestUserBalance(event: RequestUserBalance): Unit = {
@@ -220,13 +146,13 @@ class UserActor extends ReflectiveRoleableActor {
 
 
   private[this] def D_userID = {
-    if(this._userID eq null)
-      "<NOT INITIALIZED>" // We always get a userID first
-    else
-      if(this._userState eq null)
-        "%s, NO STATE".format(this._userID)
+    if(this._userState eq null)
+      if(this._imState eq null)
+        "<NOT INITIALIZED>"
       else
-        "%s".format(this._userID)
+        this._imState.imEvent.userID
+    else
+      this._userState.userID
   }
 
   private[this] def DEBUG(fmt: String, args: Any*) =
@@ -242,5 +168,5 @@ class UserActor extends ReflectiveRoleableActor {
     logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
 
   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
-      logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
+    logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
 }