One state to rule them all
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
index 3fde4f6..5840379 100644 (file)
@@ -50,9 +50,9 @@ import gr.grnet.aquarium.actor.message.GetUserWalletRequest
 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
-import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
-import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg, UserStateMsg}
-import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers}
+import gr.grnet.aquarium.charging.state.UserStateModel
+import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg}
+import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers}
 import gr.grnet.aquarium.service.event.BalanceEvent
 import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
@@ -66,15 +66,14 @@ import gr.grnet.aquarium.charging.bill.BillEntryMsg
 
 class UserActor extends ReflectiveRoleableActor {
   private[this] var _imMsgCount = 0
-  private[this] var _userStateMsg: UserStateMsg = _
-  private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _
+  private[this] var _userStateModel: UserStateModel = _
 
   def userID = {
     if(!haveUserState) {
       throw new AquariumInternalError("%s not initialized")
     }
 
-    this._userStateMsg.getUserID
+    this._userStateModel.userID
   }
 
   override def postStop() {
@@ -102,22 +101,32 @@ class UserActor extends ReflectiveRoleableActor {
   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
   }
 
-  private[this] def unsafeUserCreationIMEventMsg = this._userAgreementHistoryModel.unsafeUserCreationIMEvent
-  private[this] def haveAgreements = this._userAgreementHistoryModel ne null
-  private[this] def isUserCreated = haveAgreements && this._userAgreementHistoryModel.hasUserCreationEvent
-  private[this] def haveUserState = this._userStateMsg ne null
+  private[this] def unsafeUserCreationIMEventMsg = {
+    this._userStateModel.unsafeUserCreationIMEvent
+  }
+
+  private[this] def haveAgreements = {
+    (this._userStateModel ne null)
+  }
+
+  private[this] def haveUserCreationEvent = {
+    haveAgreements &&
+    this._userStateModel.hasUserCreationEvent
+  }
+
+  private[this] def haveUserState = {
+    (this._userStateModel ne null)
+  }
 
   /**
    * Creates the agreement history from all the stored IMEvents.
    *
    * @return (`true` iff there was a user CREATE event, the number of events processed)
    */
-  private[this] def createUserAgreementHistoryFromStoredIMEvents(): (Boolean, Int) = {
+  private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = {
     DEBUG("createUserAgreementHistoryFromStoredIMEvents()")
     assert(haveUserState, "haveUserState")
 
-    val historyMsg = MessageFactory.newUserAgreementHistoryMsg(userID)
-    this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModel(historyMsg)
 
     var _imcounter = 0
 
@@ -140,31 +149,51 @@ class UserActor extends ReflectiveRoleableActor {
           "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis)
         )
 
-        this._userAgreementHistoryModel.insertUserAgreementMsgFromIMEvent(imEvent)
+        this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
         true
       }
     }
 
-    DEBUG("Agreements: %s", this._userAgreementHistoryModel)
+    this._imMsgCount = _imcounter
+
+    DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
     (hadCreateEvent, _imcounter)
   }
 
+  private[this] def saveFirstUserState(userID: String) {
+    this._userStateModel.userStateMsg.setIsFirst(true)
+    this._userStateModel.updateUserStateMsg(
+      aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
+    )
+  }
+
+  private[this] def saveSubsequentUserState() {
+    this._userStateModel.userStateMsg.setIsFirst(false)
+    this._userStateModel.updateUserStateMsg(
+      aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
+    )
+  }
+
+  private[this] def loadLastKnownUserStateAndUpdateAgreements(historyMsg: UserAgreementHistoryMsg) {
+    val userID = historyMsg.getUserID
+    val latestUserStateOpt = aquarium.userStateStore.findLatestUserState(userID)
+    latestUserStateOpt match {
+      case None ⇒
+        // First user state ever
+        saveFirstUserState(userID)
+
+      case Some(latestUserState) ⇒
+        this._userStateModel.updateUserStateMsg(latestUserState)
+        this._userStateModel.updateUserAgreementHistoryMsg(historyMsg)
+    }
+  }
   /**
    * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
    * messaging hub (rabbitmq).
    */
   def onIMEventMsg(imEvent: IMEventMsg) {
-    if(!isUserCreated && MessageHelpers.isIMEventCreate(imEvent)) {
-      assert(this._imMsgCount == 0, "this._imMsgCount == 0")
-      // Create the full agreement history from the original sources (IMEvents)
-      val (userCreated, imEventsCount) = createUserAgreementHistoryFromStoredIMEvents()
-
-      this._imMsgCount = imEventsCount
-      return
-    }
-
     // Check for out of sync (regarding IMEvents)
-    val isOutOfSyncIM = imEvent.getOccurredMillis < this._userAgreementHistoryModel.latestIMEventOccurredMillis
+    val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis
     if(isOutOfSyncIM) {
       // clear all resource state
       // FIXME implement
@@ -180,17 +209,19 @@ class UserActor extends ReflectiveRoleableActor {
       return
     }
 
-    // OK, seems good
-    assert(!MessageHelpers.isIMEventCreate(imEvent), "!MessageHelpers.isIMEventCreate(imEvent)")
-
     // Make new agreement
-    this._userAgreementHistoryModel.insertUserAgreementMsgFromIMEvent(imEvent)
+    this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
     this._imMsgCount += 1
-    DEBUG("Agreements: %s", this._userAgreementHistoryModel)
+
+    if(haveUserCreationEvent) {
+      loadLastKnownUserStateAndUpdateAgreements(this._userStateModel.userAgreementHistoryMsg)
+    }
+
+    DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
   }
 
   def onResourceEventMsg(rcEvent: ResourceEventMsg) {
-    if(!isUserCreated) {
+    if(!haveUserCreationEvent) {
       DEBUG("No agreements. Ignoring %s", rcEvent)
 
       return
@@ -198,24 +229,23 @@ class UserActor extends ReflectiveRoleableActor {
 
     assert(haveUserState, "haveUserState")
 
-    val oldTotalCredits = Real(this._userStateMsg.getTotalCredits)
+    val oldTotalCredits = this._userStateModel.totalCreditsAsReal
 
     chargingService.processResourceEvent(
       rcEvent.getReceivedMillis,
       rcEvent,
-      this._userAgreementHistoryModel,
-      this._userStateMsg,
+      this._userStateModel,
       aquarium.currentResourceMapping,
       true
     )
 
-    val newTotalCredits = Real(this._userStateMsg.getTotalCredits)
+    val newTotalCredits = this._userStateModel.totalCreditsAsReal
 
     if(oldTotalCredits.signum * newTotalCredits.signum < 0) {
       aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0)
     }
 
-    DEBUG("Updated %s", this._userStateMsg)
+    DEBUG("Updated %s", this._userStateModel)
   }
 
   def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
@@ -228,7 +258,7 @@ class UserActor extends ReflectiveRoleableActor {
           case None => Map[String,ResourceType]()
           case Some(policy:PolicyModel) => policy.resourceTypesMap
       }
-      val state= if(haveUserState) Some(this._userStateMsg) else None
+      val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None
       val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes)
       //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
       //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
@@ -249,13 +279,12 @@ class UserActor extends ReflectiveRoleableActor {
         // (User CREATEd, with balance state)
         val realtimeMillis = TimeHelpers.nowMillis()
         chargingService.calculateRealtimeUserState(
-          this._userAgreementHistoryModel,
-          this._userStateMsg,
+          this._userStateModel,
           aquarium.currentResourceMapping,
           realtimeMillis
         )
 
-        sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateMsg.getTotalCredits)))
+        sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits)))
 
       case (true, false) ⇒
         // (User CREATEd, no balance state)
@@ -284,13 +313,12 @@ class UserActor extends ReflectiveRoleableActor {
       case true ⇒
         val realtimeMillis = TimeHelpers.nowMillis()
         chargingService.calculateRealtimeUserState(
-          this._userAgreementHistoryModel,
-          this._userStateMsg,
+          this._userStateModel,
           aquarium.currentResourceMapping,
           realtimeMillis
         )
 
-        sender ! GetUserStateResponse(Right(this._userStateMsg))
+        sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg))
 
       case false ⇒
         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
@@ -303,8 +331,7 @@ class UserActor extends ReflectiveRoleableActor {
         DEBUG("haveWorkingUserState: %s", event)
         val realtimeMillis = TimeHelpers.nowMillis()
         chargingService.calculateRealtimeUserState(
-          this._userAgreementHistoryModel,
-          this._userStateMsg,
+          this._userStateModel,
           aquarium.currentResourceMapping,
           realtimeMillis
         )
@@ -313,8 +340,8 @@ class UserActor extends ReflectiveRoleableActor {
           Right(
             GetUserWalletResponseData(
               this.userID,
-              this._userStateMsg.getTotalCredits,
-              MessageFactory.newWalletEntriesMsg(this._userStateMsg.getWalletEntries)
+              this._userStateModel.totalCredits,
+              MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries)
             )))
 
       case false ⇒
@@ -337,12 +364,23 @@ class UserActor extends ReflectiveRoleableActor {
     }
   }
 
+  /**
+   * Initializes the actor's internal state.
+   *
+   * @param userID
+   */
   def onSetUserActorUserID(userID: String) {
-    this._userStateMsg = MessageFactory.newInitialUserStateMsg(
+    // Create the full agreement history from the original sources (IMEvents)
+    this._userStateModel = ModelFactory.newInitialUserStateModel(
       userID,
-      Real.Zero,
+      Real(0),
       TimeHelpers.nowMillis()
     )
+
+    val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID)
+    if(userCreated) {
+      loadLastKnownUserStateAndUpdateAgreements(this._userStateModel.userAgreementHistoryMsg)
+    }
   }
 
   private[this] def D_userID = {