Quick commit before delving into python stuff. See you later.
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
index b19a4da..d7604cc 100644 (file)
@@ -37,7 +37,7 @@ package gr.grnet.aquarium.actor
 package service
 package user
 
-import gr.grnet.aquarium.AquariumInternalError
+import gr.grnet.aquarium.{Real, AquariumInternalError}
 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
@@ -50,17 +50,14 @@ import gr.grnet.aquarium.actor.message.GetUserWalletRequest
 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
-import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
-import gr.grnet.aquarium.computation.BillingMonthInfo
-import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg, UserStateMsg}
-import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers}
+import gr.grnet.aquarium.charging.state.UserStateModel
+import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg}
+import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers}
 import gr.grnet.aquarium.service.event.BalanceEvent
 import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
 import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
 import gr.grnet.aquarium.charging.bill.BillEntryMsg
-import gr.grnet.aquarium.event.CreditsModel
-import java.util
 
 /**
  *
@@ -68,18 +65,15 @@ import java.util
  */
 
 class UserActor extends ReflectiveRoleableActor {
-  private[this] var _rcMsgCount = 0
   private[this] var _imMsgCount = 0
-  private[this] var _userID: String = "???"
-  private[this] var _userStateMsg: UserStateMsg = _
-  private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _
+  private[this] var _userStateModel: UserStateModel = _
 
-  def unsafeUserID = {
-    if(!haveUserID) {
+  def userID = {
+    if(!haveUserState) {
       throw new AquariumInternalError("%s not initialized")
     }
 
-    this._userID
+    this._userStateModel.userID
   }
 
   override def postStop() {
@@ -88,7 +82,7 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   private[this] def shutmedown(): Unit = {
-    if(haveUserID) {
+    if(haveUserState) {
       aquarium.akkaService.invalidateUserActor(this)
     }
   }
@@ -104,51 +98,45 @@ class UserActor extends ReflectiveRoleableActor {
 
   private[this] def chargingService = aquarium.chargingService
 
-  private[this] def stdUserStateStoreFunc = (userState: UserStateMsg) ⇒ {
-    aquarium.userStateStore.insertUserState(userState)
-  }
-
   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
   }
 
-  private[this] def haveUserID = this._userID ne null
-  private[this] def unsafeUserCreationIMEventMsg = this._userAgreementHistoryModel.unsafeUserCreationIMEvent
-  private[this] def haveAgreements = this._userAgreementHistoryModel ne null
-  private[this] def isUserCreated = haveAgreements && this._userAgreementHistoryModel.hasUserCreationEvent
-  private[this] def haveUserState = this._userStateMsg ne null
+  private[this] def unsafeUserCreationIMEventMsg = {
+    this._userStateModel.unsafeUserCreationIMEvent
+  }
 
-  private[this] def createInitialUserStateMsgFromCreateIMEvent() {
-    assert(haveAgreements, "haveAgreements")
-    assert(isUserCreated, "isUserCreated")
-    assert(this._userAgreementHistoryModel.hasUserCreationEvent, "this._userAgreementHistoryModel.hasUserCreationEvent")
+  private[this] def haveAgreements = {
+    (this._userStateModel ne null)
+  }
 
-    val userCreationIMEventMsg = unsafeUserCreationIMEventMsg
-    val userStateBootstrap = aquarium.getUserStateBootstrap(userCreationIMEventMsg)
+  private[this] def haveUserCreationEvent = {
+    haveAgreements &&
+    this._userStateModel.hasUserCreationEvent
+  }
 
-    this._userStateMsg = MessageFactory.newInitialUserStateMsg(
-      this._userID,
-      CreditsModel.from(0.0),
-      TimeHelpers.nowMillis()
-    )
+  private[this] def haveUserState = {
+    (this._userStateModel ne null)
   }
 
+  private[this] def isInitial = this._userStateModel.isInitial
+
   /**
    * Creates the agreement history from all the stored IMEvents.
    *
    * @return (`true` iff there was a user CREATE event, the number of events processed)
    */
-  private[this] def createUserAgreementHistoryFromStoredIMEvents(): (Boolean, Int) = {
+  private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = {
     DEBUG("createUserAgreementHistoryFromStoredIMEvents()")
-    val historyMsg = MessageFactory.newUserAgreementHistoryMsg(this._userID)
-    this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModel(historyMsg)
+    assert(haveUserState, "haveUserState")
+
 
     var _imcounter = 0
 
-    val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
+    val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒
       _imcounter += 1
       DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent)
 
-      if(_imcounter == 1 && !MessageHelpers.isIMEventCreate(imEvent)) {
+      if(_imcounter == 1 && !MessageHelpers.isUserCreationIMEvent(imEvent)) {
         // The very first event must be a CREATE event. Otherwise we abort initialization.
         // This will normally happen during devops :)
         INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
@@ -163,31 +151,84 @@ class UserActor extends ReflectiveRoleableActor {
           "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis)
         )
 
-        this._userAgreementHistoryModel.insertUserAgreementMsgFromIMEvent(imEvent)
+        this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
         true
       }
     }
 
-    DEBUG("Agreements: %s", this._userAgreementHistoryModel)
+    this._imMsgCount = _imcounter
+
+    DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
     (hadCreateEvent, _imcounter)
   }
 
+  private[this] def saveFirstUserState(userID: String) {
+    this._userStateModel.userStateMsg.setIsFirst(true)
+    this._userStateModel.updateUserStateMsg(
+      aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
+    )
+  }
+
+  private[this] def saveSubsequentUserState() {
+    this._userStateModel.userStateMsg.setIsFirst(false)
+    this._userStateModel.updateUserStateMsg(
+      aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
+    )
+  }
+
+  private[this] def loadLastKnownUserStateAndUpdateAgreements() {
+    val userID = this._userStateModel.userID
+    aquarium.userStateStore.findLatestUserState(userID) match {
+      case None ⇒
+        // First user state ever
+        saveFirstUserState(userID)
+
+      case Some(latestUserState) ⇒
+        this._userStateModel.updateUserStateMsg(latestUserState)
+    }
+  }
+
+  private[this] def processResourceEventsAfterLastKnownUserState() {
+    // Update the user state snapshot with fresh (ie not previously processed) events.
+    aquarium.resourceEventStore.foreachResourceEventOccurredInPeriod(
+      this._userStateModel.userID,
+      this._userStateModel.latestResourceEventOccurredMillis,
+      TimeHelpers.nowMillis()
+    )
+  }
+
+  private[this] def makeUserStateMsgUpToDate() {
+    loadLastKnownUserStateAndUpdateAgreements()
+    processResourceEventsAfterLastKnownUserState()
+  }
+
+  private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = {
+    if(!isInitial) {
+      return false
+    }
+
+    val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID)
+
+    if(userCreated) {
+      makeUserStateMsgUpToDate()
+    }
+
+    nextThing()
+
+    true
+  }
+
   /**
    * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
    * messaging hub (rabbitmq).
    */
   def onIMEventMsg(imEvent: IMEventMsg) {
-    if(!isUserCreated && MessageHelpers.isIMEventCreate(imEvent)) {
-      assert(this._imMsgCount == 0, "this._imMsgCount == 0")
-      // Create the full agreement history from the original sources (IMEvents)
-      val (userCreated, imEventsCount) = createUserAgreementHistoryFromStoredIMEvents()
-
-      this._imMsgCount = imEventsCount
+    if(checkInitial()) {
       return
     }
 
     // Check for out of sync (regarding IMEvents)
-    val isOutOfSyncIM = imEvent.getOccurredMillis < this._userAgreementHistoryModel.latestIMEventOccurredMillis
+    val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis
     if(isOutOfSyncIM) {
       // clear all resource state
       // FIXME implement
@@ -203,105 +244,52 @@ class UserActor extends ReflectiveRoleableActor {
       return
     }
 
-    // OK, seems good
-    assert(!MessageHelpers.isIMEventCreate(imEvent), "!MessageHelpers.isIMEventCreate(imEvent)")
-
     // Make new agreement
-    this._userAgreementHistoryModel.insertUserAgreementMsgFromIMEvent(imEvent)
+    this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
     this._imMsgCount += 1
-    DEBUG("Agreements: %s", this._userAgreementHistoryModel)
+
+    if(MessageHelpers.isUserCreationIMEvent(imEvent)) {
+      makeUserStateMsgUpToDate()
+    }
+
+    DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
   }
 
   def onResourceEventMsg(rcEvent: ResourceEventMsg) {
-    if(!isUserCreated) {
-      DEBUG("No agreements. Ignoring %s", rcEvent)
-
+    if(checkInitial()) {
       return
     }
 
-    val now = TimeHelpers.nowMillis()
-    val resourceMapping = aquarium.resourceMappingAtMillis(now)
-
-    val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
-    val nowYear = nowBillingMonthInfo.year
-    val nowMonth = nowBillingMonthInfo.month
+    if(!haveUserCreationEvent) {
+      DEBUG("No agreements. Ignoring %s", rcEvent)
 
-    val eventOccurredMillis = rcEvent.getOccurredMillis
-    val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
-    val eventYear = eventBillingMonthInfo.year
-    val eventMonth = eventBillingMonthInfo.month
+      return
+    }
 
-    def computeBatch(): Unit = {
-      DEBUG("Going for out of sync charging for %s", rcEvent.getOriginalID)
+    assert(haveUserState, "haveUserState")
 
-      this._userStateMsg = chargingService.replayMonthChargingUpTo(
-        this._userAgreementHistoryModel,
-        nowBillingMonthInfo,
-        // Take into account that the event may be out-of-sync.
-        // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
-        now max eventOccurredMillis,
-        resourceMapping,
-        stdUserStateStoreFunc
-      )
+    val oldTotalCredits = this._userStateModel.totalCreditsAsReal
 
-    }
+    chargingService.processResourceEvent(
+      rcEvent.getReceivedMillis,
+      rcEvent,
+      this._userStateModel,
+      aquarium.currentResourceMapping,
+      true
+    )
 
-    def computeRealtime(): Unit = {
-      DEBUG("Going for in sync charging for %s", rcEvent.getOriginalID)
-      chargingService.processResourceEvent(
-        rcEvent,
-        this._userAgreementHistoryModel,
-        this._userStateMsg,
-        nowBillingMonthInfo,
-        true,
-        resourceMapping
-      )
-
-      this._rcMsgCount += 1
-    }
+    val newTotalCredits = this._userStateModel.totalCreditsAsReal
 
-    val oldTotalCredits =
-      if(this._userStateMsg!=null)
-        this._userStateMsg.totalCredits
-      else
-        0.0D
-    // FIXME check these
-    if(this._userStateMsg eq null) {
-      computeBatch()
-    }
-    else if(nowYear != eventYear || nowMonth != eventMonth) {
-      DEBUG(
-        "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
-        nowYear, eventYear,
-        nowMonth, eventMonth
-      )
-      computeBatch()
-    }
-    else if(this._userStateMsg.latestResourceEventOccurredMillis < rcEvent.getOccurredMillis) {
-      DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
-      DEBUG(
-        "%s < %s",
-        TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userStateMsg.latestResourceEventOccurredMillis),
-        TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis)
-      )
-      computeRealtime()
+    if(oldTotalCredits.signum * newTotalCredits.signum < 0) {
+      aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0)
     }
-    else {
-      DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s  and rcEvent.occurredMillis=%s",
-        TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userStateMsg.latestResourceEventOccurredMillis),
-        TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis))
 
-      computeBatch()
-    }
-    val newTotalCredits = this._userStateMsg.totalCredits
-    if(oldTotalCredits * newTotalCredits < 0)
-      aquarium.eventBus ! new BalanceEvent(this._userStateMsg.userID,
-        newTotalCredits>=0)
-    DEBUG("Updated %s", this._userStateMsg)
-    logSeparator()
+    DEBUG("Updated %s", this._userStateModel)
   }
 
   def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
+    checkInitial()
+
     try{
       val timeslot = event.timeslot
       val resourceTypes: Map[String, ResourceType] = aquarium.policyStore.
@@ -311,11 +299,11 @@ class UserActor extends ReflectiveRoleableActor {
           case None => Map[String,ResourceType]()
           case Some(policy:PolicyModel) => policy.resourceTypesMap
       }
-      val state= if(haveUserState) Some(this._userStateMsg) else None
-      val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this._userID,state,resourceTypes)
+      val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None
+      val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes)
       //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
       //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
-      val billData = GetUserBillResponseData(this._userID,billEntryMsg)
+      val billData = GetUserBillResponseData(this.userID,billEntryMsg)
       sender ! GetUserBillResponse(Right(billData))
     } catch {
       case e:Exception =>
@@ -325,6 +313,8 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
+    checkInitial()
+
     val userID = event.userID
 
     (haveAgreements, haveUserState) match {
@@ -332,14 +322,12 @@ class UserActor extends ReflectiveRoleableActor {
         // (User CREATEd, with balance state)
         val realtimeMillis = TimeHelpers.nowMillis()
         chargingService.calculateRealtimeUserState(
-          this._userAgreementHistoryModel,
-          this._userStateMsg,
-          BillingMonthInfo.fromMillis(realtimeMillis),
-          aquarium.resourceMappingAtMillis(realtimeMillis),
+          this._userStateModel,
+          aquarium.currentResourceMapping,
           realtimeMillis
         )
 
-        sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._userStateMsg.totalCredits)))
+        sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits)))
 
       case (true, false) ⇒
         // (User CREATEd, no balance state)
@@ -347,8 +335,8 @@ class UserActor extends ReflectiveRoleableActor {
         sender ! GetUserBalanceResponse(
           Right(
             GetUserBalanceResponseData(
-              this._userID,
-              aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)
+              this.userID,
+              aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString()
             )))
 
       case (false, true) ⇒
@@ -364,18 +352,18 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
+    checkInitial()
+
     haveUserState match {
       case true ⇒
         val realtimeMillis = TimeHelpers.nowMillis()
         chargingService.calculateRealtimeUserState(
-          this._userAgreementHistoryModel,
-          this._userStateMsg,
-          BillingMonthInfo.fromMillis(realtimeMillis),
-          aquarium.resourceMappingAtMillis(realtimeMillis),
+          this._userStateModel,
+          aquarium.currentResourceMapping,
           realtimeMillis
         )
 
-        sender ! GetUserStateResponse(Right(this._userStateMsg))
+        sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg))
 
       case false ⇒
         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
@@ -383,24 +371,24 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
+    checkInitial()
+
     haveUserState match {
       case true ⇒
         DEBUG("haveWorkingUserState: %s", event)
         val realtimeMillis = TimeHelpers.nowMillis()
         chargingService.calculateRealtimeUserState(
-          this._userAgreementHistoryModel,
-          this._userStateMsg,
-          BillingMonthInfo.fromMillis(realtimeMillis),
-          aquarium.resourceMappingAtMillis(realtimeMillis),
+          this._userStateModel,
+          aquarium.currentResourceMapping,
           realtimeMillis
         )
 
         sender ! GetUserWalletResponse(
           Right(
             GetUserWalletResponseData(
-              this._userID,
-              this._userStateMsg.totalCredits,
-              MessageFactory.newWalletEntriesMsg(this._userStateMsg.getWalletEntries)
+              this.userID,
+              this._userStateModel.totalCredits,
+              MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries)
             )))
 
       case false ⇒
@@ -411,8 +399,8 @@ class UserActor extends ReflectiveRoleableActor {
             sender ! GetUserWalletResponse(
               Right(
                 GetUserWalletResponseData(
-                  this._userID,
-                  aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis),
+                  this.userID,
+                  Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)),
                   MessageFactory.newWalletEntriesMsg()
                 )))
 
@@ -423,12 +411,24 @@ class UserActor extends ReflectiveRoleableActor {
     }
   }
 
+  /**
+   * Initializes the actor's internal state.
+   *
+   * @param userID
+   */
   def onSetUserActorUserID(userID: String) {
-    this._userID = userID
+    // Create the full agreement history from the original sources (IMEvents)
+    this._userStateModel = ModelFactory.newInitialUserStateModel(
+      userID,
+      Real(0),
+      TimeHelpers.nowMillis()
+    )
+
+    require(this._userStateModel.isInitial, "this._userStateModel.isInitial")
   }
 
   private[this] def D_userID = {
-    this._userID
+    if(haveUserState) userID else "???"
   }
 
   private[this] def DEBUG(fmt: String, args: Any*) =