Merge branch 'master'
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
index e84402b..7d339a4 100644 (file)
@@ -50,7 +50,6 @@ import gr.grnet.aquarium.actor.message.GetUserWalletRequest
 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
-import gr.grnet.aquarium.actor.message.config.InitializeUserActorState
 import gr.grnet.aquarium.charging.bill.AbstractBillEntry
 import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
 import gr.grnet.aquarium.computation.BillingMonthInfo
@@ -67,6 +66,9 @@ import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
  */
 
 class UserActor extends ReflectiveRoleableActor {
+  private[this] var _isFirstMessage = true
+  private[this] var _rcMsgCount = 0
+  private[this] var _imMsgCount = 0
   private[this] var _userID: String = "<?>"
   private[this] var _userState: UserStateModel = _
   private[this] var _userCreationIMEvent: IMEventMsg = _
@@ -172,18 +174,13 @@ class UserActor extends ReflectiveRoleableActor {
    * Creates the initial state that is related to IMEvents.
    */
   private[this] def initializeStateOfIMEvents(): Unit = {
-    // NOTE: this._userID is already set up by onInitializeUserActorState()
+    // NOTE: this._userID is already set up our caller
     aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
       DEBUG("Replaying %s", imEvent)
 
       updateAgreementHistoryFrom(imEvent)
       updateLatestIMEventIDFrom(imEvent)
     }
-
-    if(haveAgreements) {
-      DEBUG("Initial agreement history %s", this._userAgreementHistoryModel.toJsonString)
-      logSeparator()
-    }
   }
 
   /**
@@ -220,14 +217,14 @@ class UserActor extends ReflectiveRoleableActor {
     }
   }
 
-  private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
+  private[this] def initializeStateOfResourceEvents(): Unit = {
     if(!this.haveAgreements) {
-      DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
+      DEBUG("No agreements to initialize resources state")
       return
     }
 
     if(!this.haveUserCreationIMEvent) {
-      DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
+      DEBUG("No CREATE IMEvent to initialize resources state")
       return
     }
 
@@ -240,12 +237,16 @@ class UserActor extends ReflectiveRoleableActor {
     }
   }
 
-  def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
-    this._userID = event.userID
-    DEBUG("Got %s", event)
+  /**
+   * Initializes the actor state from DB.
+   */
+  def initializeUserActorState(userID: String): Unit = {
+    this._userID = userID
+
+    DEBUG("Initializing user actor state")
 
     initializeStateOfIMEvents()
-    initializeStateOfResourceEvents(event)
+    initializeStateOfResourceEvents()
   }
 
   /**
@@ -253,26 +254,21 @@ class UserActor extends ReflectiveRoleableActor {
    * When this method is called, we assume that all proper checks have been made and it
    * is OK to proceed with the event processing.
    */
-  def onIMEventMsg(imEvent: IMEventMsg): Unit = {
+  def onIMEventMsg(imEvent: IMEventMsg) {
+    if(this._isFirstMessage) {
+      initializeUserActorState(imEvent.getUserID)
+      // we ignore this event, since it is already saved in the store and all messages in
+      // the store have been consulted by initializeUserActorState()
+      this._isFirstMessage = false
+      return
+    }
+
     val hadUserCreationIMEvent = haveUserCreationIMEvent
 
     if(!haveAgreements) {
-      // This IMEvent has arrived after any ResourceEvents
-      INFO("Arrived after any ResourceEvent: %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent))
       initializeStateOfIMEvents()
     }
     else {
-      if(this._latestIMEventOriginalID == imEvent.getOriginalID) {
-        // This happens when the actor is brought to life, then immediately initialized, and then
-        // sent the first IM event. But from the initialization procedure, this IM event will have
-        // already been loaded from DB!
-        INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent))
-        logSeparator()
-
-        //this._latestIMEventID = imEvent.id
-        return
-      }
-
       updateAgreementHistoryFrom(imEvent)
       updateLatestIMEventIDFrom(imEvent)
     }
@@ -286,7 +282,15 @@ class UserActor extends ReflectiveRoleableActor {
     logSeparator()
   }
 
-  def onResourceEventMsg(rcEvent: ResourceEventMsg): Unit = {
+  def onResourceEventMsg(rcEvent: ResourceEventMsg) {
+    if(this._isFirstMessage) {
+      initializeUserActorState(rcEvent.getUserID)
+      // we ignore this event, since it is already saved in the store and all messages in
+      // the store have been consulted by initializeUserActorState()
+      this._isFirstMessage = false
+      return
+    }
+
     if(!shouldProcessResourceEvents) {
       // This means the user has not been created (at least, as far as Aquarium is concerned).
       // So, we do not process any resource event
@@ -296,17 +300,6 @@ class UserActor extends ReflectiveRoleableActor {
       return
     }
 
-    // Since the latest resource event per resource is recorded in the user state,
-    // we do not need to query the store. Just query the in-memory state.
-    // Note: This is a similar situation with the first IMEvent received right after the user
-    //       actor is created.
-    if(this._latestResourceEventOriginalID == rcEvent.getOriginalID) {
-      INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(rcEvent))
-      logSeparator()
-
-      return
-    }
-
     val now = TimeHelpers.nowMillis()
     // TODO: Review this and its usage in user state.
     // TODO: The assumption is that the resource set increases all the time,