Merge branch 'snapshots'
[aquarium] / src / main / scala / gr / grnet / aquarium / charging / ChargingService.scala
index f7f686c..6b93fbd 100644 (file)
 
 package gr.grnet.aquarium.charging
 
-import gr.grnet.aquarium.charging.state.{UserStateModel, UserStateBootstrap}
 import gr.grnet.aquarium.computation.BillingMonthInfo
-import gr.grnet.aquarium.message.avro.gen.{ResourcesChargingStateMsg, UserStateMsg, ResourceEventMsg}
-import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, AvroHelpers}
-import gr.grnet.aquarium.policy.ResourceType
+import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserStateMsg, ResourceEventMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, AvroHelpers}
 import gr.grnet.aquarium.util.LogHelpers.Debug
 import gr.grnet.aquarium.util.LogHelpers.DebugSeq
 import gr.grnet.aquarium.util.LogHelpers.Warn
 import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
-import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
-import java.util.{HashMap ⇒ JHashMap}
+import gr.grnet.aquarium.{Real, AquariumInternalError, AquariumAwareSkeleton}
+import java.util.{Map ⇒ JMap}
+import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel}
 
 /**
  *
@@ -65,30 +64,32 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
   //- Lifecycle
 
   def calculateRealtimeUserState(
-      userState: UserStateModel,
-      billingMonthInfo: BillingMonthInfo,
+      userStateModel: UserStateModel,
+      resourceMapping: JMap[String, ResourceTypeMsg],
       realtimeMillis: Long
   ) {
 
+    val userStateMsg = userStateModel.userStateMsg
+    val userAgreementHistoryModel = userStateModel.userAgreementHistoryMsg
+
     import scala.collection.JavaConverters.mapAsScalaMapConverter
 
-    val stateOfResources = userState.msg.getStateOfResources.asScala
-    val resourceTypesMap = userState.msg.getResourceTypesMap.asScala
+    val stateOfResources = userStateMsg.getStateOfResources.asScala
 
-    for( (resourceTypeName, workingResourcesState) ← stateOfResources) {
-      userState.msg.getResourceTypesMap.get(resourceTypeName) match {
+    for( (resourceName, workingResourcesState) ← stateOfResources) {
+      resourceMapping.get(resourceName) match {
         case null ⇒
           // Ignore
 
-        case resourceType ⇒
-          val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
+        case resourceTypeMsg ⇒
+          val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
           val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala
 
           for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) {
-            Debug(logger, "Realtime calculation for %s, %s", resourceTypeName, resourceInstanceID)
+            Debug(logger, "Realtime calculation for %s, %s", resourceName, resourceInstanceID)
             val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
-              userState.userID,
-              resourceTypeName,
+              userStateMsg.getUserID,
+              resourceName,
               resourceInstanceID,
               realtimeMillis,
               resourceInstanceState
@@ -96,9 +97,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
             DebugSeq(logger, "virtualEvents", virtualEvents, 1)
 
             processResourceEvents(
+              realtimeMillis,
               virtualEvents,
-              userState,
-              billingMonthInfo,
+              userStateModel,
+              resourceMapping,
               realtimeMillis
             )
           }
@@ -107,26 +109,27 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
   }
 
   def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+      processingTimeMillis: Long,
+      userStateModel: UserStateModel,
       billingMonthInfo: BillingMonthInfo,
-      userStateBootstrap: UserStateBootstrap,
-      defaultResourceTypesMap: Map[String, ResourceType],
+      resourceMapping: JMap[String, ResourceTypeMsg],
       userStateRecorder: UserStateMsg ⇒ UserStateMsg
-  ): UserStateModel = {
+  ): UserStateMsg = {
 
-    def computeFullMonthBillingAndSaveState(): UserStateModel = {
+    def computeFullMonthBillingAndSaveState(): UserStateMsg = {
       val fullMonthUserState = replayFullMonthBilling(
-        userStateBootstrap,
+        processingTimeMillis,
+        userStateModel,
         billingMonthInfo,
-        defaultResourceTypesMap,
+        resourceMapping,
         userStateRecorder
       )
 
-      val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState.msg).
-        setIsFullBillingMonth(true).
+      val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState).
+        setIsForFullMonth(true).
         setBillingYear(billingMonthInfo.year).
         setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay?
         setOriginalID("").
-        setResourceTypesMap(MessageFactory.newResourceTypeMsgsMap(defaultResourceTypesMap)).
         build()
 
       // We always save the state when it is a full month billing
@@ -134,11 +137,11 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
 
       Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1))
 
-      ModelFactory.newUserStateModel(monthlyUserState1)
+      monthlyUserState1
     }
 
-    val userID = userStateBootstrap.userID
-    val userCreationMillis = userStateBootstrap.userCreationMillis
+    val userID = userStateModel.userID
+    val userCreationMillis = userStateModel.unsafeUserCreationMillis
     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
     val billingMonthStopMillis = billingMonthInfo.monthStopMillis
@@ -149,9 +152,9 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
 
       // TODO: The initial user state might have already been created.
       //       First ask if it exists and compute only if not
-      val initialUserState0 = MessageFactory.createInitialUserStateMsg(
-        userStateBootstrap,
-        defaultResourceTypesMap,
+      val initialUserState0 = MessageFactory.newInitialUserStateMsg(
+        userID,
+        Real.Zero,
         TimeHelpers.nowMillis()
       )
 
@@ -162,7 +165,7 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
 
       Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1))
 
-      return ModelFactory.newUserStateModel(initialUserState1)
+      return initialUserState1
     }
 
     // Ask DB cache for the latest known user state for this billing period
@@ -190,7 +193,7 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
           // ZERO, we are OK!
           case 0 ⇒
             // NOTE: Keep the caller's calculation reason
-            ModelFactory.newUserStateModel(latestUserState)
+            latestUserState
 
           // We had more, so must recompute
           case n if n > 0 ⇒
@@ -208,74 +211,76 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
   }
   /**
    * Processes one resource event and computes relevant, incremental charges.
+   * If needed, it may go back in time and recompute stuff.
    *
    * @param resourceEvent
-   * @param userStateModel
-   * @param billingMonthInfo
    */
   def processResourceEvent(
+      processingTimeMillis: Long,
       resourceEvent: ResourceEventMsg,
       userStateModel: UserStateModel,
-      billingMonthInfo: BillingMonthInfo,
+      resourceMapping: JMap[String, ResourceTypeMsg],
       updateLatestMillis: Boolean
-  ): Boolean = {
-    logger.warn("processResourceEvent:workingUserState=%s".format(userStateModel)) //
-    val resourceTypeName = resourceEvent.getResource
-    val resourceType = userStateModel.msg.getResourceTypesMap.get(resourceTypeName)
-    if(resourceType eq null) {
-      // Unknown (yet) resource, ignoring event.
-      return false
-    }
+  ) {
+    val userStateMsg = userStateModel.userStateMsg
+    val userAgreementHistoryModel = userStateModel.userAgreementHistoryMsg
+
+    val resourceName = resourceEvent.getResource
+    val resourceTypeMsg = resourceMapping.get(resourceName)
+
+    val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
+    val resourcesChargingState = MessageHelpers.getOrInitializeResourcesChargingState(
+      userStateMsg,
+      resourceName,
+      chargingBehavior.initialChargingDetails
+    )
+
+    val eventOccurredMillis = resourceEvent.getOccurredMillis
+    val eventReceivedMillis = resourceEvent.getReceivedMillis
+    val billingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
+
+
+    // See if this is out-of-sync
+
 
-    val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
-    val resourcesChargingState = userStateModel.msg.getStateOfResources.get(resourceTypeName) match {
-      case null ⇒
-        // First time for this ChargingBehavior.
-        val newState = new ResourcesChargingStateMsg
-        newState.setResource(resourceTypeName)
-        newState.setDetails(chargingBehavior.initialChargingDetails)
-        newState.setStateOfResourceInstance(new JHashMap())
-        userStateModel.msg.getStateOfResources.put(resourceTypeName,newState)
-        newState
-      case existingState ⇒
-        existingState
-    }
 
     val m0 = TimeHelpers.nowMillis()
     val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
       aquarium,
       resourceEvent,
-      resourceType,
+      resourceTypeMsg,
       billingMonthInfo,
       resourcesChargingState,
       userStateModel,
-      msg ⇒ userStateModel.msg.getWalletEntries.add(msg)
+      msg ⇒ userStateMsg.getWalletEntries.add(msg)
     )
     val m1 = TimeHelpers.nowMillis()
 
     if(updateLatestMillis) {
-      userStateModel.msg.setLatestUpdateMillis(m1)
+      userStateMsg.setLatestUpdateMillis(m1)
     }
 
-    userStateModel.updateLatestResourceEventOccurredMillis(resourceEvent.getOccurredMillis)
-    userStateModel.subtractCredits(creditsToSubtract)
+    MessageHelpers.updateLatestResourceEventOccurredMillis(userStateMsg, resourceEvent.getOccurredMillis)
+    MessageHelpers.subtractCredits(userStateMsg, creditsToSubtract)
 
     true
   }
 
   def processResourceEvents(
+      processingTimeMillis: Long,
       resourceEvents: Traversable[ResourceEventMsg],
-      userState: UserStateModel,
-      billingMonthInfo: BillingMonthInfo,
+      userStateModel: UserStateModel,
+      resourceMapping: JMap[String, ResourceTypeMsg],
       latestUpdateMillis: Long
   ): Unit = {
 
     var _counter = 0
     for(currentResourceEvent ← resourceEvents) {
       processResourceEvent(
+        processingTimeMillis,
         currentResourceEvent,
-        userState,
-        billingMonthInfo,
+        userStateModel,
+        resourceMapping,
         false
       )
 
@@ -283,22 +288,24 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
     }
 
     if(_counter > 0) {
-      userState.msg.setLatestUpdateMillis(latestUpdateMillis)
+      userStateModel.userStateMsg.setLatestUpdateMillis(latestUpdateMillis)
     }
   }
 
   def replayFullMonthBilling(
-      userStateBootstrap: UserStateBootstrap,
+      processingTimeMillis: Long,
+      userStateModel: UserStateModel,
       billingMonthInfo: BillingMonthInfo,
-      defaultResourceTypesMap: Map[String, ResourceType],
+      resourceMapping: JMap[String, ResourceTypeMsg],
       userStateRecorder: UserStateMsg ⇒ UserStateMsg
-  ): UserStateModel = {
+  ): UserStateMsg = {
 
     replayMonthChargingUpTo(
+      processingTimeMillis,
+      userStateModel,
       billingMonthInfo,
       billingMonthInfo.monthStopMillis,
-      userStateBootstrap,
-      defaultResourceTypesMap,
+      resourceMapping,
       userStateRecorder
     )
   }
@@ -309,28 +316,30 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
    *
    * @param billingMonthInfo Which month to bill.
    * @param billingEndTimeMillis Bill from start of month up to (and including) this time.
-   * @param userStateBootstrap
-   * @param resourceTypesMap
    * @param userStateRecorder
    * @return
    */
   def replayMonthChargingUpTo(
+      processingTimeMillis: Long,
+      userStateModel: UserStateModel,
       billingMonthInfo: BillingMonthInfo,
       billingEndTimeMillis: Long,
-      userStateBootstrap: UserStateBootstrap,
-      resourceTypesMap: Map[String, ResourceType],
+      resourceMapping: JMap[String, ResourceTypeMsg],
       userStateRecorder: UserStateMsg ⇒ UserStateMsg
-  ): UserStateModel = {
+  ): UserStateMsg = {
+
+    val userAgreementHistoryMsg = userStateModel.userAgreementHistoryMsg
 
     val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
-    val userID = userStateBootstrap.userID
+    val userID = userStateModel.userID
 
     // In order to replay the full month, we start with the state at the beginning of the month.
     val previousBillingMonthInfo = billingMonthInfo.previousMonth
-    val userState = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+    val userStateMsg = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+      processingTimeMillis,
+      userStateModel,
       previousBillingMonthInfo,
-      userStateBootstrap,
-      resourceTypesMap,
+      resourceMapping,
       userStateRecorder
     )
 
@@ -339,10 +348,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
     // specified in the parameters.
     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
 
-    Debug(logger, "workingUserState=%s", userState)
+    Debug(logger, "workingUserState=%s", userStateMsg)
     Debug(logger, "previousBillingMonthUserState(%s) = %s",
       previousBillingMonthInfo.toShortDebugString,
-      userState
+      userStateMsg
     )
 
     var _rcEventsCounter = 0
@@ -355,9 +364,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
       Debug(logger, "Processing %s", currentResourceEvent)
 
       processResourceEvent(
+        processingTimeMillis,
         currentResourceEvent,
-        userState,
-        billingMonthInfo,
+        userStateModel,
+        resourceMapping,
         false
       )
 
@@ -365,7 +375,7 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
     }
 
     if(_rcEventsCounter > 0) {
-      userState.msg.setLatestUpdateMillis(TimeHelpers.nowMillis())
+      userStateMsg.setLatestUpdateMillis(TimeHelpers.nowMillis())
     }
 
     Debug(logger, "Found %s resource events for month %s",
@@ -406,6 +416,6 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
       workingUserState.totalCredits    = specialWorkingUserState.totalCredits
     }*/
 
-    userState
+    userStateMsg
   }
 }