Change the representation of computed credit values
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
index b19a4da..3fde4f6 100644 (file)
@@ -37,7 +37,7 @@ package gr.grnet.aquarium.actor
 package service
 package user
 
-import gr.grnet.aquarium.AquariumInternalError
+import gr.grnet.aquarium.{Real, AquariumInternalError}
 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
@@ -51,7 +51,6 @@ import gr.grnet.aquarium.actor.message.GetUserWalletResponse
 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
 import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
-import gr.grnet.aquarium.computation.BillingMonthInfo
 import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg, UserStateMsg}
 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers}
 import gr.grnet.aquarium.service.event.BalanceEvent
@@ -59,8 +58,6 @@ import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
 import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
 import gr.grnet.aquarium.charging.bill.BillEntryMsg
-import gr.grnet.aquarium.event.CreditsModel
-import java.util
 
 /**
  *
@@ -68,18 +65,16 @@ import java.util
  */
 
 class UserActor extends ReflectiveRoleableActor {
-  private[this] var _rcMsgCount = 0
   private[this] var _imMsgCount = 0
-  private[this] var _userID: String = "???"
   private[this] var _userStateMsg: UserStateMsg = _
   private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _
 
-  def unsafeUserID = {
-    if(!haveUserID) {
+  def userID = {
+    if(!haveUserState) {
       throw new AquariumInternalError("%s not initialized")
     }
 
-    this._userID
+    this._userStateMsg.getUserID
   }
 
   override def postStop() {
@@ -88,7 +83,7 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   private[this] def shutmedown(): Unit = {
-    if(haveUserID) {
+    if(haveUserState) {
       aquarium.akkaService.invalidateUserActor(this)
     }
   }
@@ -104,34 +99,14 @@ class UserActor extends ReflectiveRoleableActor {
 
   private[this] def chargingService = aquarium.chargingService
 
-  private[this] def stdUserStateStoreFunc = (userState: UserStateMsg) ⇒ {
-    aquarium.userStateStore.insertUserState(userState)
-  }
-
   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
   }
 
-  private[this] def haveUserID = this._userID ne null
   private[this] def unsafeUserCreationIMEventMsg = this._userAgreementHistoryModel.unsafeUserCreationIMEvent
   private[this] def haveAgreements = this._userAgreementHistoryModel ne null
   private[this] def isUserCreated = haveAgreements && this._userAgreementHistoryModel.hasUserCreationEvent
   private[this] def haveUserState = this._userStateMsg ne null
 
-  private[this] def createInitialUserStateMsgFromCreateIMEvent() {
-    assert(haveAgreements, "haveAgreements")
-    assert(isUserCreated, "isUserCreated")
-    assert(this._userAgreementHistoryModel.hasUserCreationEvent, "this._userAgreementHistoryModel.hasUserCreationEvent")
-
-    val userCreationIMEventMsg = unsafeUserCreationIMEventMsg
-    val userStateBootstrap = aquarium.getUserStateBootstrap(userCreationIMEventMsg)
-
-    this._userStateMsg = MessageFactory.newInitialUserStateMsg(
-      this._userID,
-      CreditsModel.from(0.0),
-      TimeHelpers.nowMillis()
-    )
-  }
-
   /**
    * Creates the agreement history from all the stored IMEvents.
    *
@@ -139,12 +114,14 @@ class UserActor extends ReflectiveRoleableActor {
    */
   private[this] def createUserAgreementHistoryFromStoredIMEvents(): (Boolean, Int) = {
     DEBUG("createUserAgreementHistoryFromStoredIMEvents()")
-    val historyMsg = MessageFactory.newUserAgreementHistoryMsg(this._userID)
+    assert(haveUserState, "haveUserState")
+
+    val historyMsg = MessageFactory.newUserAgreementHistoryMsg(userID)
     this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModel(historyMsg)
 
     var _imcounter = 0
 
-    val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
+    val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒
       _imcounter += 1
       DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent)
 
@@ -219,86 +196,26 @@ class UserActor extends ReflectiveRoleableActor {
       return
     }
 
-    val now = TimeHelpers.nowMillis()
-    val resourceMapping = aquarium.resourceMappingAtMillis(now)
-
-    val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
-    val nowYear = nowBillingMonthInfo.year
-    val nowMonth = nowBillingMonthInfo.month
+    assert(haveUserState, "haveUserState")
 
-    val eventOccurredMillis = rcEvent.getOccurredMillis
-    val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
-    val eventYear = eventBillingMonthInfo.year
-    val eventMonth = eventBillingMonthInfo.month
+    val oldTotalCredits = Real(this._userStateMsg.getTotalCredits)
 
-    def computeBatch(): Unit = {
-      DEBUG("Going for out of sync charging for %s", rcEvent.getOriginalID)
-
-      this._userStateMsg = chargingService.replayMonthChargingUpTo(
-        this._userAgreementHistoryModel,
-        nowBillingMonthInfo,
-        // Take into account that the event may be out-of-sync.
-        // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
-        now max eventOccurredMillis,
-        resourceMapping,
-        stdUserStateStoreFunc
-      )
+    chargingService.processResourceEvent(
+      rcEvent.getReceivedMillis,
+      rcEvent,
+      this._userAgreementHistoryModel,
+      this._userStateMsg,
+      aquarium.currentResourceMapping,
+      true
+    )
 
-    }
+    val newTotalCredits = Real(this._userStateMsg.getTotalCredits)
 
-    def computeRealtime(): Unit = {
-      DEBUG("Going for in sync charging for %s", rcEvent.getOriginalID)
-      chargingService.processResourceEvent(
-        rcEvent,
-        this._userAgreementHistoryModel,
-        this._userStateMsg,
-        nowBillingMonthInfo,
-        true,
-        resourceMapping
-      )
-
-      this._rcMsgCount += 1
+    if(oldTotalCredits.signum * newTotalCredits.signum < 0) {
+      aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0)
     }
 
-    val oldTotalCredits =
-      if(this._userStateMsg!=null)
-        this._userStateMsg.totalCredits
-      else
-        0.0D
-    // FIXME check these
-    if(this._userStateMsg eq null) {
-      computeBatch()
-    }
-    else if(nowYear != eventYear || nowMonth != eventMonth) {
-      DEBUG(
-        "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
-        nowYear, eventYear,
-        nowMonth, eventMonth
-      )
-      computeBatch()
-    }
-    else if(this._userStateMsg.latestResourceEventOccurredMillis < rcEvent.getOccurredMillis) {
-      DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
-      DEBUG(
-        "%s < %s",
-        TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userStateMsg.latestResourceEventOccurredMillis),
-        TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis)
-      )
-      computeRealtime()
-    }
-    else {
-      DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s  and rcEvent.occurredMillis=%s",
-        TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userStateMsg.latestResourceEventOccurredMillis),
-        TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis))
-
-      computeBatch()
-    }
-    val newTotalCredits = this._userStateMsg.totalCredits
-    if(oldTotalCredits * newTotalCredits < 0)
-      aquarium.eventBus ! new BalanceEvent(this._userStateMsg.userID,
-        newTotalCredits>=0)
     DEBUG("Updated %s", this._userStateMsg)
-    logSeparator()
   }
 
   def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
@@ -312,10 +229,10 @@ class UserActor extends ReflectiveRoleableActor {
           case Some(policy:PolicyModel) => policy.resourceTypesMap
       }
       val state= if(haveUserState) Some(this._userStateMsg) else None
-      val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this._userID,state,resourceTypes)
+      val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes)
       //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
       //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
-      val billData = GetUserBillResponseData(this._userID,billEntryMsg)
+      val billData = GetUserBillResponseData(this.userID,billEntryMsg)
       sender ! GetUserBillResponse(Right(billData))
     } catch {
       case e:Exception =>
@@ -334,12 +251,11 @@ class UserActor extends ReflectiveRoleableActor {
         chargingService.calculateRealtimeUserState(
           this._userAgreementHistoryModel,
           this._userStateMsg,
-          BillingMonthInfo.fromMillis(realtimeMillis),
-          aquarium.resourceMappingAtMillis(realtimeMillis),
+          aquarium.currentResourceMapping,
           realtimeMillis
         )
 
-        sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._userStateMsg.totalCredits)))
+        sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateMsg.getTotalCredits)))
 
       case (true, false) ⇒
         // (User CREATEd, no balance state)
@@ -347,8 +263,8 @@ class UserActor extends ReflectiveRoleableActor {
         sender ! GetUserBalanceResponse(
           Right(
             GetUserBalanceResponseData(
-              this._userID,
-              aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)
+              this.userID,
+              aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString()
             )))
 
       case (false, true) ⇒
@@ -370,8 +286,7 @@ class UserActor extends ReflectiveRoleableActor {
         chargingService.calculateRealtimeUserState(
           this._userAgreementHistoryModel,
           this._userStateMsg,
-          BillingMonthInfo.fromMillis(realtimeMillis),
-          aquarium.resourceMappingAtMillis(realtimeMillis),
+          aquarium.currentResourceMapping,
           realtimeMillis
         )
 
@@ -390,16 +305,15 @@ class UserActor extends ReflectiveRoleableActor {
         chargingService.calculateRealtimeUserState(
           this._userAgreementHistoryModel,
           this._userStateMsg,
-          BillingMonthInfo.fromMillis(realtimeMillis),
-          aquarium.resourceMappingAtMillis(realtimeMillis),
+          aquarium.currentResourceMapping,
           realtimeMillis
         )
 
         sender ! GetUserWalletResponse(
           Right(
             GetUserWalletResponseData(
-              this._userID,
-              this._userStateMsg.totalCredits,
+              this.userID,
+              this._userStateMsg.getTotalCredits,
               MessageFactory.newWalletEntriesMsg(this._userStateMsg.getWalletEntries)
             )))
 
@@ -411,8 +325,8 @@ class UserActor extends ReflectiveRoleableActor {
             sender ! GetUserWalletResponse(
               Right(
                 GetUserWalletResponseData(
-                  this._userID,
-                  aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis),
+                  this.userID,
+                  Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)),
                   MessageFactory.newWalletEntriesMsg()
                 )))
 
@@ -424,11 +338,15 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   def onSetUserActorUserID(userID: String) {
-    this._userID = userID
+    this._userStateMsg = MessageFactory.newInitialUserStateMsg(
+      userID,
+      Real.Zero,
+      TimeHelpers.nowMillis()
+    )
   }
 
   private[this] def D_userID = {
-    this._userID
+    if(haveUserState) userID else "???"
   }
 
   private[this] def DEBUG(fmt: String, args: Any*) =