WIP: New state machine for message processing
[aquarium] / src / main / scala / gr / grnet / aquarium / charging / ChargingService.scala
index f7f686c..1fc351c 100644 (file)
 
 package gr.grnet.aquarium.charging
 
-import gr.grnet.aquarium.charging.state.{UserStateModel, UserStateBootstrap}
 import gr.grnet.aquarium.computation.BillingMonthInfo
-import gr.grnet.aquarium.message.avro.gen.{ResourcesChargingStateMsg, UserStateMsg, ResourceEventMsg}
-import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, AvroHelpers}
-import gr.grnet.aquarium.policy.ResourceType
+import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserStateMsg, ResourceEventMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, AvroHelpers}
 import gr.grnet.aquarium.util.LogHelpers.Debug
 import gr.grnet.aquarium.util.LogHelpers.DebugSeq
 import gr.grnet.aquarium.util.LogHelpers.Warn
 import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
 import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
-import java.util.{HashMap ⇒ JHashMap}
+import java.util.{Map ⇒ JMap}
+import gr.grnet.aquarium.event.CreditsModel
+import gr.grnet.aquarium.charging.state.UserAgreementHistoryModel
 
 /**
  *
@@ -65,30 +65,31 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
   //- Lifecycle
 
   def calculateRealtimeUserState(
-      userState: UserStateModel,
+      userAgreementHistoryModel: UserAgreementHistoryModel,
+      userStateMsg: UserStateMsg,
       billingMonthInfo: BillingMonthInfo,
+      resourceMapping: JMap[String, ResourceTypeMsg],
       realtimeMillis: Long
   ) {
 
     import scala.collection.JavaConverters.mapAsScalaMapConverter
 
-    val stateOfResources = userState.msg.getStateOfResources.asScala
-    val resourceTypesMap = userState.msg.getResourceTypesMap.asScala
+    val stateOfResources = userStateMsg.getStateOfResources.asScala
 
-    for( (resourceTypeName, workingResourcesState) ← stateOfResources) {
-      userState.msg.getResourceTypesMap.get(resourceTypeName) match {
+    for( (resourceName, workingResourcesState) ← stateOfResources) {
+      resourceMapping.get(resourceName) match {
         case null ⇒
           // Ignore
 
-        case resourceType ⇒
-          val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
+        case resourceTypeMsg ⇒
+          val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
           val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala
 
           for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) {
-            Debug(logger, "Realtime calculation for %s, %s", resourceTypeName, resourceInstanceID)
+            Debug(logger, "Realtime calculation for %s, %s", resourceName, resourceInstanceID)
             val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
-              userState.userID,
-              resourceTypeName,
+              userStateMsg.getUserID,
+              resourceName,
               resourceInstanceID,
               realtimeMillis,
               resourceInstanceState
@@ -97,8 +98,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
 
             processResourceEvents(
               virtualEvents,
-              userState,
+              userAgreementHistoryModel,
+              userStateMsg,
               billingMonthInfo,
+              resourceMapping,
               realtimeMillis
             )
           }
@@ -107,26 +110,25 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
   }
 
   def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+      userAgreementHistoryModel: UserAgreementHistoryModel,
       billingMonthInfo: BillingMonthInfo,
-      userStateBootstrap: UserStateBootstrap,
-      defaultResourceTypesMap: Map[String, ResourceType],
+      resourceMapping: JMap[String, ResourceTypeMsg],
       userStateRecorder: UserStateMsg ⇒ UserStateMsg
-  ): UserStateModel = {
+  ): UserStateMsg = {
 
-    def computeFullMonthBillingAndSaveState(): UserStateModel = {
+    def computeFullMonthBillingAndSaveState(): UserStateMsg = {
       val fullMonthUserState = replayFullMonthBilling(
-        userStateBootstrap,
+        userAgreementHistoryModel,
         billingMonthInfo,
-        defaultResourceTypesMap,
+        resourceMapping,
         userStateRecorder
       )
 
-      val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState.msg).
+      val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState).
         setIsFullBillingMonth(true).
         setBillingYear(billingMonthInfo.year).
         setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay?
         setOriginalID("").
-        setResourceTypesMap(MessageFactory.newResourceTypeMsgsMap(defaultResourceTypesMap)).
         build()
 
       // We always save the state when it is a full month billing
@@ -134,11 +136,11 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
 
       Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1))
 
-      ModelFactory.newUserStateModel(monthlyUserState1)
+      monthlyUserState1
     }
 
-    val userID = userStateBootstrap.userID
-    val userCreationMillis = userStateBootstrap.userCreationMillis
+    val userID = userAgreementHistoryModel.userID
+    val userCreationMillis = userAgreementHistoryModel.unsafeUserCreationMillis
     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
     val billingMonthStopMillis = billingMonthInfo.monthStopMillis
@@ -149,9 +151,9 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
 
       // TODO: The initial user state might have already been created.
       //       First ask if it exists and compute only if not
-      val initialUserState0 = MessageFactory.createInitialUserStateMsg(
-        userStateBootstrap,
-        defaultResourceTypesMap,
+      val initialUserState0 = MessageFactory.newInitialUserStateMsg(
+        userID,
+        CreditsModel.from(0.0),
         TimeHelpers.nowMillis()
       )
 
@@ -162,7 +164,7 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
 
       Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1))
 
-      return ModelFactory.newUserStateModel(initialUserState1)
+      return initialUserState1
     }
 
     // Ask DB cache for the latest known user state for this billing period
@@ -190,7 +192,7 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
           // ZERO, we are OK!
           case 0 ⇒
             // NOTE: Keep the caller's calculation reason
-            ModelFactory.newUserStateModel(latestUserState)
+            latestUserState
 
           // We had more, so must recompute
           case n if n > 0 ⇒
@@ -210,32 +212,34 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
    * Processes one resource event and computes relevant, incremental charges.
    *
    * @param resourceEvent
-   * @param userStateModel
+   * @param userStateMsg
    * @param billingMonthInfo
    */
   def processResourceEvent(
       resourceEvent: ResourceEventMsg,
-      userStateModel: UserStateModel,
+      userAgreementHistoryModel: UserAgreementHistoryModel,
+      userStateMsg: UserStateMsg,
       billingMonthInfo: BillingMonthInfo,
-      updateLatestMillis: Boolean
+      updateLatestMillis: Boolean,
+      resourceMapping: JMap[String, ResourceTypeMsg]
   ): Boolean = {
-    logger.warn("processResourceEvent:workingUserState=%s".format(userStateModel)) //
-    val resourceTypeName = resourceEvent.getResource
-    val resourceType = userStateModel.msg.getResourceTypesMap.get(resourceTypeName)
-    if(resourceType eq null) {
+    logger.warn("processResourceEvent:workingUserState=%s".format(userStateMsg)) //
+    val resourceName = resourceEvent.getResource
+    val resourceTypeMsg = resourceMapping.get(resourceName)
+    if(resourceTypeMsg eq null) {
       // Unknown (yet) resource, ignoring event.
       return false
     }
 
-    val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
-    val resourcesChargingState = userStateModel.msg.getStateOfResources.get(resourceTypeName) match {
+    val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
+    val resourcesChargingState = userStateMsg.getStateOfResources.get(resourceName) match {
       case null ⇒
         // First time for this ChargingBehavior.
-        val newState = new ResourcesChargingStateMsg
-        newState.setResource(resourceTypeName)
-        newState.setDetails(chargingBehavior.initialChargingDetails)
-        newState.setStateOfResourceInstance(new JHashMap())
-        userStateModel.msg.getStateOfResources.put(resourceTypeName,newState)
+        val newState = MessageFactory.newResourcesChargingStateMsg(
+          resourceName,
+          chargingBehavior.initialChargingDetails
+        )
+        userStateMsg.getStateOfResources.put(resourceName, newState)
         newState
       case existingState ⇒
         existingState
@@ -245,28 +249,31 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
     val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
       aquarium,
       resourceEvent,
-      resourceType,
+      resourceTypeMsg,
       billingMonthInfo,
       resourcesChargingState,
-      userStateModel,
-      msg ⇒ userStateModel.msg.getWalletEntries.add(msg)
+      userAgreementHistoryModel,
+      userStateMsg,
+      msg ⇒ userStateMsg.getWalletEntries.add(msg)
     )
     val m1 = TimeHelpers.nowMillis()
 
     if(updateLatestMillis) {
-      userStateModel.msg.setLatestUpdateMillis(m1)
+      userStateMsg.setLatestUpdateMillis(m1)
     }
 
-    userStateModel.updateLatestResourceEventOccurredMillis(resourceEvent.getOccurredMillis)
-    userStateModel.subtractCredits(creditsToSubtract)
+    MessageHelpers.updateLatestResourceEventOccurredMillis(userStateMsg, resourceEvent.getOccurredMillis)
+    MessageHelpers.subtractCredits(userStateMsg, creditsToSubtract)
 
     true
   }
 
   def processResourceEvents(
       resourceEvents: Traversable[ResourceEventMsg],
-      userState: UserStateModel,
+      userAgreementHistoryModel: UserAgreementHistoryModel,
+      userStateMsg: UserStateMsg,
       billingMonthInfo: BillingMonthInfo,
+      resourceMapping: JMap[String, ResourceTypeMsg],
       latestUpdateMillis: Long
   ): Unit = {
 
@@ -274,31 +281,33 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
     for(currentResourceEvent ← resourceEvents) {
       processResourceEvent(
         currentResourceEvent,
-        userState,
+        userAgreementHistoryModel,
+        userStateMsg,
         billingMonthInfo,
-        false
+        false,
+        resourceMapping
       )
 
       _counter += 1
     }
 
     if(_counter > 0) {
-      userState.msg.setLatestUpdateMillis(latestUpdateMillis)
+      userStateMsg.setLatestUpdateMillis(latestUpdateMillis)
     }
   }
 
   def replayFullMonthBilling(
-      userStateBootstrap: UserStateBootstrap,
+      userAgreementHistoryModel: UserAgreementHistoryModel,
       billingMonthInfo: BillingMonthInfo,
-      defaultResourceTypesMap: Map[String, ResourceType],
+      resourceMapping: JMap[String, ResourceTypeMsg],
       userStateRecorder: UserStateMsg ⇒ UserStateMsg
-  ): UserStateModel = {
+  ): UserStateMsg = {
 
     replayMonthChargingUpTo(
+      userAgreementHistoryModel,
       billingMonthInfo,
       billingMonthInfo.monthStopMillis,
-      userStateBootstrap,
-      defaultResourceTypesMap,
+      resourceMapping,
       userStateRecorder
     )
   }
@@ -309,28 +318,26 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
    *
    * @param billingMonthInfo Which month to bill.
    * @param billingEndTimeMillis Bill from start of month up to (and including) this time.
-   * @param userStateBootstrap
-   * @param resourceTypesMap
    * @param userStateRecorder
    * @return
    */
   def replayMonthChargingUpTo(
+      userAgreementHistoryModel: UserAgreementHistoryModel,
       billingMonthInfo: BillingMonthInfo,
       billingEndTimeMillis: Long,
-      userStateBootstrap: UserStateBootstrap,
-      resourceTypesMap: Map[String, ResourceType],
+      resourceMapping: JMap[String, ResourceTypeMsg],
       userStateRecorder: UserStateMsg ⇒ UserStateMsg
-  ): UserStateModel = {
+  ): UserStateMsg = {
 
     val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
-    val userID = userStateBootstrap.userID
+    val userID = userAgreementHistoryModel.userID
 
     // In order to replay the full month, we start with the state at the beginning of the month.
     val previousBillingMonthInfo = billingMonthInfo.previousMonth
-    val userState = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+    val userStateMsg = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+      userAgreementHistoryModel,
       previousBillingMonthInfo,
-      userStateBootstrap,
-      resourceTypesMap,
+      resourceMapping,
       userStateRecorder
     )
 
@@ -339,10 +346,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
     // specified in the parameters.
     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
 
-    Debug(logger, "workingUserState=%s", userState)
+    Debug(logger, "workingUserState=%s", userStateMsg)
     Debug(logger, "previousBillingMonthUserState(%s) = %s",
       previousBillingMonthInfo.toShortDebugString,
-      userState
+      userStateMsg
     )
 
     var _rcEventsCounter = 0
@@ -356,16 +363,18 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
 
       processResourceEvent(
         currentResourceEvent,
-        userState,
+        userAgreementHistoryModel,
+        userStateMsg,
         billingMonthInfo,
-        false
+        false,
+        resourceMapping
       )
 
       _rcEventsCounter += 1
     }
 
     if(_rcEventsCounter > 0) {
-      userState.msg.setLatestUpdateMillis(TimeHelpers.nowMillis())
+      userStateMsg.setLatestUpdateMillis(TimeHelpers.nowMillis())
     }
 
     Debug(logger, "Found %s resource events for month %s",
@@ -406,6 +415,6 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
       workingUserState.totalCredits    = specialWorkingUserState.totalCredits
     }*/
 
-    userState
+    userStateMsg
   }
 }