Change the representation of computed credit values
[aquarium] / src / main / scala / gr / grnet / aquarium / charging / ChargingService.scala
index febed87..1c3b02e 100644 (file)
 
 package gr.grnet.aquarium.charging
 
-import gr.grnet.aquarium.event.model.resource.ResourceEventModel
 import gr.grnet.aquarium.computation.BillingMonthInfo
-import gr.grnet.aquarium.charging.state.UserStateBootstrap
-import gr.grnet.aquarium.policy.ResourceType
-import gr.grnet.aquarium.util.{Lifecycle, Loggable, ContextualLogger}
+import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserStateMsg, ResourceEventMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, AvroHelpers}
+import gr.grnet.aquarium.util.LogHelpers.Debug
+import gr.grnet.aquarium.util.LogHelpers.DebugSeq
+import gr.grnet.aquarium.util.LogHelpers.Warn
 import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
-import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
-import gr.grnet.aquarium.charging.state.{WorkingUserState, UserStateModel, StdUserState}
-import gr.grnet.aquarium.charging.reason.{MonthlyBillChargingReason, InitialUserStateSetup, ChargingReason}
+import gr.grnet.aquarium.util.{Lifecycle, Loggable}
+import gr.grnet.aquarium.{Real, AquariumInternalError, AquariumAwareSkeleton}
+import java.util.{Map ⇒ JMap}
+import gr.grnet.aquarium.charging.state.UserAgreementHistoryModel
 
 /**
  *
@@ -56,89 +58,113 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
   lazy val resourceEventStore = aquarium.resourceEventStore
 
   //+ Lifecycle
-  def start() = ()
+  def start() {}
 
-  def stop() = ()
+  def stop() {}
   //- Lifecycle
 
-
-  //+ Utility methods
-  protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
-    rcEvent.toDebugString
+  def calculateRealtimeUserState(
+      userAgreementHistoryModel: UserAgreementHistoryModel,
+      userStateMsg: UserStateMsg,
+      resourceMapping: JMap[String, ResourceTypeMsg],
+      realtimeMillis: Long
+  ) {
+
+    import scala.collection.JavaConverters.mapAsScalaMapConverter
+
+    val stateOfResources = userStateMsg.getStateOfResources.asScala
+
+    for( (resourceName, workingResourcesState) ← stateOfResources) {
+      resourceMapping.get(resourceName) match {
+        case null ⇒
+          // Ignore
+
+        case resourceTypeMsg ⇒
+          val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
+          val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala
+
+          for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) {
+            Debug(logger, "Realtime calculation for %s, %s", resourceName, resourceInstanceID)
+            val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
+              userStateMsg.getUserID,
+              resourceName,
+              resourceInstanceID,
+              realtimeMillis,
+              resourceInstanceState
+            )
+            DebugSeq(logger, "virtualEvents", virtualEvents, 1)
+
+            processResourceEvents(
+              realtimeMillis,
+              virtualEvents,
+              userAgreementHistoryModel,
+              userStateMsg,
+              resourceMapping,
+              realtimeMillis
+            )
+          }
+      }
+    }
   }
-  //- Utility methods
 
   def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+      processingTimeMillis: Long,
+      userAgreementHistoryModel: UserAgreementHistoryModel,
       billingMonthInfo: BillingMonthInfo,
-      userStateBootstrap: UserStateBootstrap,
-      defaultResourceTypesMap: Map[String, ResourceType],
-      chargingReason: ChargingReason,
-      userStateRecorder: UserStateModel ⇒ UserStateModel,
-      clogOpt: Option[ContextualLogger]
-  ): WorkingUserState = {
-
-    val clog = ContextualLogger.fromOther(
-      clogOpt,
-      logger,
-      "findOrCalculateWorkingUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
-    clog.begin()
-
-    lazy val clogSome = Some(clog)
-
-    def computeFullMonthBillingAndSaveState(): WorkingUserState = {
-      val workingUserState = replayFullMonthBilling(
-        userStateBootstrap,
+      resourceMapping: JMap[String, ResourceTypeMsg],
+      userStateRecorder: UserStateMsg ⇒ UserStateMsg
+  ): UserStateMsg = {
+
+    def computeFullMonthBillingAndSaveState(): UserStateMsg = {
+      val fullMonthUserState = replayFullMonthBilling(
+        processingTimeMillis,
+        userAgreementHistoryModel,
         billingMonthInfo,
-        defaultResourceTypesMap,
-        chargingReason,
-        userStateRecorder,
-        clogSome
+        resourceMapping,
+        userStateRecorder
       )
 
-      val newChargingReason = MonthlyBillChargingReason(chargingReason, billingMonthInfo)
-      workingUserState.chargingReason = newChargingReason
-      val monthlyUserState0 = workingUserState.toUserState(
-        true,
-        billingMonthInfo.year,
-        billingMonthInfo.month,
-        None
-      )
+      val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState).
+        setIsFullBillingMonth(true).
+        setBillingYear(billingMonthInfo.year).
+        setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay?
+        setOriginalID("").
+        build()
 
       // We always save the state when it is a full month billing
       val monthlyUserState1 = userStateRecorder.apply(monthlyUserState0)
 
-      clog.debug("Stored full %s %s", billingMonthInfo.toDebugString, monthlyUserState1.toJsonString)
+      Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1))
 
-      workingUserState
+      monthlyUserState1
     }
 
-    val userID = userStateBootstrap.userID
-    val userCreationMillis = userStateBootstrap.userCreationMillis
+    val userID = userAgreementHistoryModel.userID
+    val userCreationMillis = userAgreementHistoryModel.unsafeUserCreationMillis
     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
     val billingMonthStopMillis = billingMonthInfo.monthStopMillis
 
     if(billingMonthStopMillis < userCreationMillis) {
       // If the user did not exist for this billing month, piece of cake
-      clog.debug("User did not exist before %s", userCreationDateCalc)
+      Debug(logger, "User did not exist before %s", userCreationDateCalc)
 
       // TODO: The initial user state might have already been created.
       //       First ask if it exists and compute only if not
-      val initialUserState0 = StdUserState.createInitialUserStateFromBootstrap(
-        userStateBootstrap,
-        TimeHelpers.nowMillis(),
-        InitialUserStateSetup(Some(chargingReason)) // we record the originating calculation reason
+      val initialUserState0 = MessageFactory.newInitialUserStateMsg(
+        userID,
+        Real.Zero,
+        TimeHelpers.nowMillis()
       )
 
-      logger.debug("Created (from bootstrap) initial user state %s".format(initialUserState0))
+      Debug(logger, "Created (from bootstrap) initial user state %s", initialUserState0)
 
       // We always save the initial state
       val initialUserState1 = userStateRecorder.apply(initialUserState0)
 
-      clog.debug("Stored initial state = %s", initialUserState1.toJsonString)
-      clog.end()
+      Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1))
 
-      return initialUserState1.toWorkingUserState(defaultResourceTypesMap)
+      return initialUserState1
     }
 
     // Ask DB cache for the latest known user state for this billing period
@@ -149,15 +175,13 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
     latestUserStateOpt match {
       case None ⇒
         // Not found, must compute
-        clog.debug("No user state found from cache, will have to (re)compute")
-        val result = computeFullMonthBillingAndSaveState
-        clog.end()
-        result
+        Debug(logger, "No user state found from cache, will have to (re)compute")
+        computeFullMonthBillingAndSaveState
 
       case Some(latestUserState) ⇒
         // Found a "latest" user state but need to see if it is indeed the true and one latest.
         // For this reason, we must count the events again.
-        val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
+        val latestStateOOSEventsCounter = latestUserState.getBillingPeriodOutOfSyncResourceEventsCounter
         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
           userID,
           billingMonthStartMillis,
@@ -168,104 +192,124 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
           // ZERO, we are OK!
           case 0 ⇒
             // NOTE: Keep the caller's calculation reason
-            val userStateModel = latestUserState.newWithChargingReason(chargingReason)
-            clog.end()
-            userStateModel.toWorkingUserState(defaultResourceTypesMap)
+            latestUserState
 
           // We had more, so must recompute
           case n if n > 0 ⇒
-            clog.debug(
+            Debug(logger,
               "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
-            val workingUserState = computeFullMonthBillingAndSaveState
-            clog.end()
-            workingUserState
+            computeFullMonthBillingAndSaveState
 
           // We had less????
           case n if n < 0 ⇒
             val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
-            clog.warn(errMsg)
+            Warn(logger, errMsg)
             throw new AquariumInternalError(errMsg)
         }
     }
   }
   /**
-   * Processes one resource event and computes relevant charges.
+   * Processes one resource event and computes relevant, incremental charges.
+   * If needed, it may go back in time and recompute stuff.
    *
    * @param resourceEvent
-   * @param workingUserState
-   * @param chargingReason
-   * @param billingMonthInfo
-   * @param clogOpt
+   * @param userStateMsg
    */
   def processResourceEvent(
-      resourceEvent: ResourceEventModel,
-      workingUserState: WorkingUserState,
-      chargingReason: ChargingReason,
-      billingMonthInfo: BillingMonthInfo,
-      clogOpt: Option[ContextualLogger]
-  ): Unit = {
+      processingTimeMillis: Long,
+      resourceEvent: ResourceEventMsg,
+      userAgreementHistoryModel: UserAgreementHistoryModel,
+      userStateMsg: UserStateMsg,
+      resourceMapping: JMap[String, ResourceTypeMsg],
+      updateLatestMillis: Boolean
+  ) {
+    require(userStateMsg ne null, "userStateMsg ne null")
+
+    val resourceName = resourceEvent.getResource
+    val resourceTypeMsg = resourceMapping.get(resourceName)
+
+    val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
+    val resourcesChargingState = MessageHelpers.getOrInitializeResourcesChargingState(
+      userStateMsg,
+      resourceName,
+      chargingBehavior.initialChargingDetails
+    )
+
+    val eventOccurredMillis = resourceEvent.getOccurredMillis
+    val eventReceivedMillis = resourceEvent.getReceivedMillis
+    val billingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
+
+
+    // See if this is out-of-sync
 
-    val resourceTypeName = resourceEvent.resource
-    val resourceTypeOpt = workingUserState.findResourceType(resourceTypeName)
-    if(resourceTypeOpt.isEmpty) {
-      return
-    }
-    val resourceType = resourceTypeOpt.get
-    val resourceAndInstanceInfo = resourceEvent.safeResourceInstanceInfo
 
-    val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
 
-    val (walletEntriesCount, newTotalCredits) = chargingBehavior.chargeResourceEvent(
+    val m0 = TimeHelpers.nowMillis()
+    val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
       aquarium,
       resourceEvent,
-      resourceType,
+      resourceTypeMsg,
       billingMonthInfo,
-      workingUserState.workingAgreementHistory.toAgreementHistory,
-      workingUserState.getChargingDataForResourceEvent(resourceAndInstanceInfo),
-      workingUserState.totalCredits,
-      workingUserState.walletEntries += _,
-      clogOpt
+      resourcesChargingState,
+      userAgreementHistoryModel,
+      userStateMsg,
+      msg ⇒ userStateMsg.getWalletEntries.add(msg)
     )
+    val m1 = TimeHelpers.nowMillis()
 
-    workingUserState.totalCredits = newTotalCredits
+    if(updateLatestMillis) {
+      userStateMsg.setLatestUpdateMillis(m1)
+    }
+
+    MessageHelpers.updateLatestResourceEventOccurredMillis(userStateMsg, resourceEvent.getOccurredMillis)
+    MessageHelpers.subtractCredits(userStateMsg, creditsToSubtract)
+
+    true
   }
 
   def processResourceEvents(
-      resourceEvents: Traversable[ResourceEventModel],
-      workingUserState: WorkingUserState,
-      chargingReason: ChargingReason,
-      billingMonthInfo: BillingMonthInfo,
-      clogOpt: Option[ContextualLogger] = None
+      processingTimeMillis: Long,
+      resourceEvents: Traversable[ResourceEventMsg],
+      userAgreementHistoryModel: UserAgreementHistoryModel,
+      userStateMsg: UserStateMsg,
+      resourceMapping: JMap[String, ResourceTypeMsg],
+      latestUpdateMillis: Long
   ): Unit = {
 
+    var _counter = 0
     for(currentResourceEvent ← resourceEvents) {
       processResourceEvent(
+        processingTimeMillis,
         currentResourceEvent,
-        workingUserState,
-        chargingReason,
-        billingMonthInfo,
-        clogOpt
+        userAgreementHistoryModel,
+        userStateMsg,
+        resourceMapping,
+        false
       )
+
+      _counter += 1
+    }
+
+    if(_counter > 0) {
+      userStateMsg.setLatestUpdateMillis(latestUpdateMillis)
     }
   }
 
   def replayFullMonthBilling(
-      userStateBootstrap: UserStateBootstrap,
+      processingTimeMillis: Long,
+      userAgreementHistoryModel: UserAgreementHistoryModel,
       billingMonthInfo: BillingMonthInfo,
-      defaultResourceTypesMap: Map[String, ResourceType],
-      chargingReason: ChargingReason,
-      userStateRecorder: UserStateModel ⇒ UserStateModel,
-      clogOpt: Option[ContextualLogger]
-  ): WorkingUserState = {
+      resourceMapping: JMap[String, ResourceTypeMsg],
+      userStateRecorder: UserStateMsg ⇒ UserStateMsg
+  ): UserStateMsg = {
 
     replayMonthChargingUpTo(
+      processingTimeMillis,
+      userAgreementHistoryModel,
       billingMonthInfo,
       billingMonthInfo.monthStopMillis,
-      userStateBootstrap,
-      defaultResourceTypesMap,
-      chargingReason,
-      userStateRecorder,
-      clogOpt
+      resourceMapping,
+      userStateRecorder
     )
   }
 
@@ -275,45 +319,29 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
    *
    * @param billingMonthInfo Which month to bill.
    * @param billingEndTimeMillis Bill from start of month up to (and including) this time.
-   * @param userStateBootstrap
-   * @param resourceTypesMap
-   * @param chargingReason
    * @param userStateRecorder
-   * @param clogOpt
    * @return
    */
   def replayMonthChargingUpTo(
+      processingTimeMillis: Long,
+      userAgreementHistoryModel: UserAgreementHistoryModel,
       billingMonthInfo: BillingMonthInfo,
       billingEndTimeMillis: Long,
-      userStateBootstrap: UserStateBootstrap,
-      resourceTypesMap: Map[String, ResourceType],
-      chargingReason: ChargingReason,
-      userStateRecorder: UserStateModel ⇒ UserStateModel,
-      clogOpt: Option[ContextualLogger]
-  ): WorkingUserState = {
+      resourceMapping: JMap[String, ResourceTypeMsg],
+      userStateRecorder: UserStateMsg ⇒ UserStateMsg
+  ): UserStateMsg = {
 
     val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
-    val userID = userStateBootstrap.userID
-
-    val clog = ContextualLogger.fromOther(
-      clogOpt,
-      logger,
-      "replayMonthChargingUpTo(%s)", TimeHelpers.toYYYYMMDDHHMMSSSSS(billingEndTimeMillis))
-    clog.begin()
-
-    clog.debug("%s", chargingReason)
-
-    val clogSome = Some(clog)
+    val userID = userAgreementHistoryModel.userID
 
     // In order to replay the full month, we start with the state at the beginning of the month.
     val previousBillingMonthInfo = billingMonthInfo.previousMonth
-    val workingUserState = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+    val userStateMsg = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+      processingTimeMillis,
+      userAgreementHistoryModel,
       previousBillingMonthInfo,
-      userStateBootstrap,
-      resourceTypesMap,
-      chargingReason,
-      userStateRecorder,
-      clogSome
+      resourceMapping,
+      userStateRecorder
     )
 
     // FIXME the below comments
@@ -321,9 +349,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
     // specified in the parameters.
     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
 
-    clog.debug("previousBillingMonthUserState(%s) = %s".format(
+    Debug(logger, "workingUserState=%s", userStateMsg)
+    Debug(logger, "previousBillingMonthUserState(%s) = %s",
       previousBillingMonthInfo.toShortDebugString,
-      workingUserState.toJsonString)
+      userStateMsg
     )
 
     var _rcEventsCounter = 0
@@ -333,22 +362,31 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
       billingEndTimeMillis               // to requested time
     ) { currentResourceEvent ⇒
 
-      clog.debug("Processing %s".format(currentResourceEvent))
+      Debug(logger, "Processing %s", currentResourceEvent)
 
       processResourceEvent(
+        processingTimeMillis,
         currentResourceEvent,
-        workingUserState,
-        chargingReason,
-        billingMonthInfo,
-        clogSome
+        userAgreementHistoryModel,
+        userStateMsg,
+        resourceMapping,
+        false
       )
 
       _rcEventsCounter += 1
     }
 
-    clog.debug("Found %s resource events for month %s".format(_rcEventsCounter, billingMonthInfo.toShortDebugString))
+    if(_rcEventsCounter > 0) {
+      userStateMsg.setLatestUpdateMillis(TimeHelpers.nowMillis())
+    }
 
-    if(isFullMonthBilling) {
+    Debug(logger, "Found %s resource events for month %s",
+      _rcEventsCounter,
+      billingMonthInfo.toShortDebugString
+    )
+
+    // FIXME Reuse the logic here...Do not erase the comment...
+    /*if(isFullMonthBilling) {
       // For the remaining events which must contribute an implicit OFF, we collect those OFFs
       // ... in order to generate an implicit ON later (during the next billing cycle).
       val (generatorsOfImplicitEnds, theirImplicitEnds) = workingUserState.findAndRemoveGeneratorsOfImplicitEndEvents(
@@ -357,10 +395,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
       )
 
       if(generatorsOfImplicitEnds.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
-        clog.debug("")
-        clog.debug("Process implicitly issued events")
-        clog.debugSeq("generatorsOfImplicitEnds", generatorsOfImplicitEnds, 0)
-        clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
+        Debug(logger, "")
+        Debug(logger, "Process implicitly issued events")
+        DebugSeq(logger, "generatorsOfImplicitEnds", generatorsOfImplicitEnds, 0)
+        DebugSeq(logger, "theirImplicitEnds", theirImplicitEnds, 0)
       }
 
       // Now, the previous and implicitly started must be our base for the following computation, so we create an
@@ -373,15 +411,13 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo
         theirImplicitEnds,
         specialWorkingUserState,
         chargingReason,
-        billingMonthInfo,
-        clogSome
+        billingMonthInfo
       )
 
       workingUserState.walletEntries ++= specialWorkingUserState.walletEntries
       workingUserState.totalCredits    = specialWorkingUserState.totalCredits
-    }
+    }*/
 
-    clog.end()
-    workingUserState
+    userStateMsg
   }
 }