Merge branch 'snapshots'
[aquarium] / src / main / scala / gr / grnet / aquarium / charging / ChargingService.scala
index 7a2db51..6b93fbd 100644 (file)
 
 package gr.grnet.aquarium.charging
 
-import scala.collection.mutable
-import gr.grnet.aquarium.event.model.resource.ResourceEventModel
 import gr.grnet.aquarium.computation.BillingMonthInfo
-import gr.grnet.aquarium.charging.state.{WorkingResourcesChargingState, UserStateBootstrap, WorkingUserState, UserStateModel, StdUserState}
-import gr.grnet.aquarium.policy.ResourceType
-import gr.grnet.aquarium.util.{Lifecycle, Loggable}
+import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserStateMsg, ResourceEventMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, AvroHelpers}
 import gr.grnet.aquarium.util.LogHelpers.Debug
 import gr.grnet.aquarium.util.LogHelpers.DebugSeq
 import gr.grnet.aquarium.util.LogHelpers.Warn
 import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
-import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
+import gr.grnet.aquarium.util.{Lifecycle, Loggable}
+import gr.grnet.aquarium.{Real, AquariumInternalError, AquariumAwareSkeleton}
+import java.util.{Map ⇒ JMap}
+import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel}
 
 /**
  *
@@ -63,41 +63,44 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
   def stop() {}
   //- Lifecycle
 
-
-  //+ Utility methods
-  protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
-    rcEvent.toDebugString
-  }
-  //- Utility methods
-
-  def calculateRealtimeWorkingUserState(
-      workingUserState: WorkingUserState,
-      billingMonthInfo: BillingMonthInfo,
+  def calculateRealtimeUserState(
+      userStateModel: UserStateModel,
+      resourceMapping: JMap[String, ResourceTypeMsg],
       realtimeMillis: Long
   ) {
-    for( (resourceTypeName, workingResourcesState) ← workingUserState.workingStateOfResources) {
-      workingUserState.findResourceType(resourceTypeName) match {
-        case None ⇒
+
+    val userStateMsg = userStateModel.userStateMsg
+    val userAgreementHistoryModel = userStateModel.userAgreementHistoryMsg
+
+    import scala.collection.JavaConverters.mapAsScalaMapConverter
+
+    val stateOfResources = userStateMsg.getStateOfResources.asScala
+
+    for( (resourceName, workingResourcesState) ← stateOfResources) {
+      resourceMapping.get(resourceName) match {
+        case null ⇒
           // Ignore
 
-        case Some(resourceType) ⇒
-          val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
+        case resourceTypeMsg ⇒
+          val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
+          val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala
 
-          for((resourceInstanceID, workingResourceInstanceState) ← workingResourcesState.stateOfResourceInstance) {
-            Debug(logger, "Realtime calculation for %s, %s", resourceTypeName, resourceInstanceID)
+          for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) {
+            Debug(logger, "Realtime calculation for %s, %s", resourceName, resourceInstanceID)
             val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
-              workingUserState.userID,
-              resourceTypeName,
+              userStateMsg.getUserID,
+              resourceName,
               resourceInstanceID,
               realtimeMillis,
-              workingResourceInstanceState
+              resourceInstanceState
             )
             DebugSeq(logger, "virtualEvents", virtualEvents, 1)
 
             processResourceEvents(
+              realtimeMillis,
               virtualEvents,
-              workingUserState,
-              billingMonthInfo,
+              userStateModel,
+              resourceMapping,
               realtimeMillis
             )
           }
@@ -106,37 +109,39 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
   }
 
   def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+      processingTimeMillis: Long,
+      userStateModel: UserStateModel,
       billingMonthInfo: BillingMonthInfo,
-      userStateBootstrap: UserStateBootstrap,
-      defaultResourceTypesMap: Map[String, ResourceType],
-      userStateRecorder: UserStateModel ⇒ UserStateModel
-  ): WorkingUserState = {
-
-    def computeFullMonthBillingAndSaveState(): WorkingUserState = {
-      val workingUserState = replayFullMonthBilling(
-        userStateBootstrap,
+      resourceMapping: JMap[String, ResourceTypeMsg],
+      userStateRecorder: UserStateMsg ⇒ UserStateMsg
+  ): UserStateMsg = {
+
+    def computeFullMonthBillingAndSaveState(): UserStateMsg = {
+      val fullMonthUserState = replayFullMonthBilling(
+        processingTimeMillis,
+        userStateModel,
         billingMonthInfo,
-        defaultResourceTypesMap,
+        resourceMapping,
         userStateRecorder
       )
 
-      val monthlyUserState0 = workingUserState.toUserState(
-        true,
-        billingMonthInfo.year,
-        billingMonthInfo.month,
-        ""
-      )
+      val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState).
+        setIsForFullMonth(true).
+        setBillingYear(billingMonthInfo.year).
+        setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay?
+        setOriginalID("").
+        build()
 
       // We always save the state when it is a full month billing
       val monthlyUserState1 = userStateRecorder.apply(monthlyUserState0)
 
-      Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, monthlyUserState1.toJsonString)
+      Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1))
 
-      workingUserState
+      monthlyUserState1
     }
 
-    val userID = userStateBootstrap.userID
-    val userCreationMillis = userStateBootstrap.userCreationMillis
+    val userID = userStateModel.userID
+    val userCreationMillis = userStateModel.unsafeUserCreationMillis
     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
     val billingMonthStopMillis = billingMonthInfo.monthStopMillis
@@ -147,8 +152,9 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
 
       // TODO: The initial user state might have already been created.
       //       First ask if it exists and compute only if not
-      val initialUserState0 = StdUserState.createInitialUserStateFromBootstrap(
-        userStateBootstrap,
+      val initialUserState0 = MessageFactory.newInitialUserStateMsg(
+        userID,
+        Real.Zero,
         TimeHelpers.nowMillis()
       )
 
@@ -157,9 +163,9 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
       // We always save the initial state
       val initialUserState1 = userStateRecorder.apply(initialUserState0)
 
-      Debug(logger, "Stored initial state = %s", initialUserState1.toJsonString)
+      Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1))
 
-      return initialUserState1.toWorkingUserState(defaultResourceTypesMap)
+      return initialUserState1
     }
 
     // Ask DB cache for the latest known user state for this billing period
@@ -176,7 +182,7 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
       case Some(latestUserState) ⇒
         // Found a "latest" user state but need to see if it is indeed the true and one latest.
         // For this reason, we must count the events again.
-        val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
+        val latestStateOOSEventsCounter = latestUserState.getBillingPeriodOutOfSyncResourceEventsCounter
         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
           userID,
           billingMonthStartMillis,
@@ -187,7 +193,7 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
           // ZERO, we are OK!
           case 0 ⇒
             // NOTE: Keep the caller's calculation reason
-            latestUserState.toWorkingUserState(defaultResourceTypesMap)
+            latestUserState
 
           // We had more, so must recompute
           case n if n > 0 ⇒
@@ -205,78 +211,76 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
   }
   /**
    * Processes one resource event and computes relevant, incremental charges.
+   * If needed, it may go back in time and recompute stuff.
    *
    * @param resourceEvent
-   * @param workingUserState
-   * @param billingMonthInfo
    */
   def processResourceEvent(
-      resourceEvent: ResourceEventModel,
-      workingUserState: WorkingUserState,
-      billingMonthInfo: BillingMonthInfo,
+      processingTimeMillis: Long,
+      resourceEvent: ResourceEventMsg,
+      userStateModel: UserStateModel,
+      resourceMapping: JMap[String, ResourceTypeMsg],
       updateLatestMillis: Boolean
-  ): Boolean = {
+  ) {
+    val userStateMsg = userStateModel.userStateMsg
+    val userAgreementHistoryModel = userStateModel.userAgreementHistoryMsg
 
-    val resourceTypeName = resourceEvent.resource
-    val resourceTypeOpt = workingUserState.findResourceType(resourceTypeName)
-    if(resourceTypeOpt.isEmpty) {
-      // Unknown (yet) resource, ignoring event.
-      return false
-    }
-    val resourceType = resourceTypeOpt.get
+    val resourceName = resourceEvent.getResource
+    val resourceTypeMsg = resourceMapping.get(resourceName)
+
+    val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
+    val resourcesChargingState = MessageHelpers.getOrInitializeResourcesChargingState(
+      userStateMsg,
+      resourceName,
+      chargingBehavior.initialChargingDetails
+    )
+
+    val eventOccurredMillis = resourceEvent.getOccurredMillis
+    val eventReceivedMillis = resourceEvent.getReceivedMillis
+    val billingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
+
+
+    // See if this is out-of-sync
 
-    val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
-    val workingResourcesState = workingUserState.workingStateOfResources.get(resourceTypeName) match {
-      case Some(existingState) ⇒
-        existingState
 
-      case None ⇒
-        // First time for this ChargingBehavior.
-        val newState = new WorkingResourcesChargingState(
-          details = mutable.Map(chargingBehavior.initialChargingDetails.toSeq:_*),
-          stateOfResourceInstance = mutable.Map()
-        )
-
-        workingUserState.workingStateOfResources(resourceTypeName) = newState
-        newState
-    }
 
     val m0 = TimeHelpers.nowMillis()
     val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
       aquarium,
       resourceEvent,
-      resourceType,
+      resourceTypeMsg,
       billingMonthInfo,
-      workingResourcesState,
-      workingUserState.workingAgreementHistory,
-      workingUserState.totalCredits,
-      workingUserState.walletEntries += _
+      resourcesChargingState,
+      userStateModel,
+      msg ⇒ userStateMsg.getWalletEntries.add(msg)
     )
     val m1 = TimeHelpers.nowMillis()
 
     if(updateLatestMillis) {
-      workingUserState.latestUpdateMillis = m1
+      userStateMsg.setLatestUpdateMillis(m1)
     }
 
-    workingUserState.updateLatestResourceEventOccurredMillis(resourceEvent.occurredMillis)
-    workingUserState.totalCredits -= creditsToSubtract
+    MessageHelpers.updateLatestResourceEventOccurredMillis(userStateMsg, resourceEvent.getOccurredMillis)
+    MessageHelpers.subtractCredits(userStateMsg, creditsToSubtract)
 
     true
   }
 
   def processResourceEvents(
-      resourceEvents: Traversable[ResourceEventModel],
-      workingUserState: WorkingUserState,
-      billingMonthInfo: BillingMonthInfo,
+      processingTimeMillis: Long,
+      resourceEvents: Traversable[ResourceEventMsg],
+      userStateModel: UserStateModel,
+      resourceMapping: JMap[String, ResourceTypeMsg],
       latestUpdateMillis: Long
   ): Unit = {
 
     var _counter = 0
     for(currentResourceEvent ← resourceEvents) {
       processResourceEvent(
+        processingTimeMillis,
         currentResourceEvent,
-        workingUserState,
-        billingMonthInfo,
+        userStateModel,
+        resourceMapping,
         false
       )
 
@@ -284,22 +288,24 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
     }
 
     if(_counter > 0) {
-      workingUserState.latestUpdateMillis = latestUpdateMillis
+      userStateModel.userStateMsg.setLatestUpdateMillis(latestUpdateMillis)
     }
   }
 
   def replayFullMonthBilling(
-      userStateBootstrap: UserStateBootstrap,
+      processingTimeMillis: Long,
+      userStateModel: UserStateModel,
       billingMonthInfo: BillingMonthInfo,
-      defaultResourceTypesMap: Map[String, ResourceType],
-      userStateRecorder: UserStateModel ⇒ UserStateModel
-  ): WorkingUserState = {
+      resourceMapping: JMap[String, ResourceTypeMsg],
+      userStateRecorder: UserStateMsg ⇒ UserStateMsg
+  ): UserStateMsg = {
 
     replayMonthChargingUpTo(
+      processingTimeMillis,
+      userStateModel,
       billingMonthInfo,
       billingMonthInfo.monthStopMillis,
-      userStateBootstrap,
-      defaultResourceTypesMap,
+      resourceMapping,
       userStateRecorder
     )
   }
@@ -310,28 +316,30 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
    *
    * @param billingMonthInfo Which month to bill.
    * @param billingEndTimeMillis Bill from start of month up to (and including) this time.
-   * @param userStateBootstrap
-   * @param resourceTypesMap
    * @param userStateRecorder
    * @return
    */
   def replayMonthChargingUpTo(
+      processingTimeMillis: Long,
+      userStateModel: UserStateModel,
       billingMonthInfo: BillingMonthInfo,
       billingEndTimeMillis: Long,
-      userStateBootstrap: UserStateBootstrap,
-      resourceTypesMap: Map[String, ResourceType],
-      userStateRecorder: UserStateModel ⇒ UserStateModel
-  ): WorkingUserState = {
+      resourceMapping: JMap[String, ResourceTypeMsg],
+      userStateRecorder: UserStateMsg ⇒ UserStateMsg
+  ): UserStateMsg = {
+
+    val userAgreementHistoryMsg = userStateModel.userAgreementHistoryMsg
 
     val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
-    val userID = userStateBootstrap.userID
+    val userID = userStateModel.userID
 
     // In order to replay the full month, we start with the state at the beginning of the month.
     val previousBillingMonthInfo = billingMonthInfo.previousMonth
-    val workingUserState = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+    val userStateMsg = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+      processingTimeMillis,
+      userStateModel,
       previousBillingMonthInfo,
-      userStateBootstrap,
-      resourceTypesMap,
+      resourceMapping,
       userStateRecorder
     )
 
@@ -340,10 +348,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
     // specified in the parameters.
     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
 
-    Debug(logger, "workingUserState=%s", workingUserState)
+    Debug(logger, "workingUserState=%s", userStateMsg)
     Debug(logger, "previousBillingMonthUserState(%s) = %s",
       previousBillingMonthInfo.toShortDebugString,
-      workingUserState
+      userStateMsg
     )
 
     var _rcEventsCounter = 0
@@ -356,9 +364,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
       Debug(logger, "Processing %s", currentResourceEvent)
 
       processResourceEvent(
+        processingTimeMillis,
         currentResourceEvent,
-        workingUserState,
-        billingMonthInfo,
+        userStateModel,
+        resourceMapping,
         false
       )
 
@@ -366,7 +375,7 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
     }
 
     if(_rcEventsCounter > 0) {
-      workingUserState.latestUpdateMillis = TimeHelpers.nowMillis()
+      userStateMsg.setLatestUpdateMillis(TimeHelpers.nowMillis())
     }
 
     Debug(logger, "Found %s resource events for month %s",
@@ -407,6 +416,6 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
       workingUserState.totalCredits    = specialWorkingUserState.totalCredits
     }*/
 
-    workingUserState
+    userStateMsg
   }
 }