X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/68349f490e09f63a01065e6eb066b29f79e558c1..c66d56a7b64c619e49ae2bc2b0ddcf9905fbbbd9:/src/main/scala/gr/grnet/aquarium/charging/ChargingService.scala diff --git a/src/main/scala/gr/grnet/aquarium/charging/ChargingService.scala b/src/main/scala/gr/grnet/aquarium/charging/ChargingService.scala index febed87..1c3b02e 100644 --- a/src/main/scala/gr/grnet/aquarium/charging/ChargingService.scala +++ b/src/main/scala/gr/grnet/aquarium/charging/ChargingService.scala @@ -35,15 +35,17 @@ package gr.grnet.aquarium.charging -import gr.grnet.aquarium.event.model.resource.ResourceEventModel import gr.grnet.aquarium.computation.BillingMonthInfo -import gr.grnet.aquarium.charging.state.UserStateBootstrap -import gr.grnet.aquarium.policy.ResourceType -import gr.grnet.aquarium.util.{Lifecycle, Loggable, ContextualLogger} +import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserStateMsg, ResourceEventMsg} +import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, AvroHelpers} +import gr.grnet.aquarium.util.LogHelpers.Debug +import gr.grnet.aquarium.util.LogHelpers.DebugSeq +import gr.grnet.aquarium.util.LogHelpers.Warn import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers} -import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton} -import gr.grnet.aquarium.charging.state.{WorkingUserState, UserStateModel, StdUserState} -import gr.grnet.aquarium.charging.reason.{MonthlyBillChargingReason, InitialUserStateSetup, ChargingReason} +import gr.grnet.aquarium.util.{Lifecycle, Loggable} +import gr.grnet.aquarium.{Real, AquariumInternalError, AquariumAwareSkeleton} +import java.util.{Map ⇒ JMap} +import gr.grnet.aquarium.charging.state.UserAgreementHistoryModel /** * @@ -56,89 +58,113 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo lazy val resourceEventStore = aquarium.resourceEventStore //+ Lifecycle - def start() = () + def start() {} - def stop() = () + def stop() {} //- Lifecycle - - //+ Utility methods - protected def rcDebugInfo(rcEvent: ResourceEventModel) = { - rcEvent.toDebugString + def calculateRealtimeUserState( + userAgreementHistoryModel: UserAgreementHistoryModel, + userStateMsg: UserStateMsg, + resourceMapping: JMap[String, ResourceTypeMsg], + realtimeMillis: Long + ) { + + import scala.collection.JavaConverters.mapAsScalaMapConverter + + val stateOfResources = userStateMsg.getStateOfResources.asScala + + for( (resourceName, workingResourcesState) ← stateOfResources) { + resourceMapping.get(resourceName) match { + case null ⇒ + // Ignore + + case resourceTypeMsg ⇒ + val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg) + val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala + + for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) { + Debug(logger, "Realtime calculation for %s, %s", resourceName, resourceInstanceID) + val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation( + userStateMsg.getUserID, + resourceName, + resourceInstanceID, + realtimeMillis, + resourceInstanceState + ) + DebugSeq(logger, "virtualEvents", virtualEvents, 1) + + processResourceEvents( + realtimeMillis, + virtualEvents, + userAgreementHistoryModel, + userStateMsg, + resourceMapping, + realtimeMillis + ) + } + } + } } - //- Utility methods def findOrCalculateWorkingUserStateAtEndOfBillingMonth( + processingTimeMillis: Long, + userAgreementHistoryModel: UserAgreementHistoryModel, billingMonthInfo: BillingMonthInfo, - userStateBootstrap: UserStateBootstrap, - defaultResourceTypesMap: Map[String, ResourceType], - chargingReason: ChargingReason, - userStateRecorder: UserStateModel ⇒ UserStateModel, - clogOpt: Option[ContextualLogger] - ): WorkingUserState = { - - val clog = ContextualLogger.fromOther( - clogOpt, - logger, - "findOrCalculateWorkingUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString) - clog.begin() - - lazy val clogSome = Some(clog) - - def computeFullMonthBillingAndSaveState(): WorkingUserState = { - val workingUserState = replayFullMonthBilling( - userStateBootstrap, + resourceMapping: JMap[String, ResourceTypeMsg], + userStateRecorder: UserStateMsg ⇒ UserStateMsg + ): UserStateMsg = { + + def computeFullMonthBillingAndSaveState(): UserStateMsg = { + val fullMonthUserState = replayFullMonthBilling( + processingTimeMillis, + userAgreementHistoryModel, billingMonthInfo, - defaultResourceTypesMap, - chargingReason, - userStateRecorder, - clogSome + resourceMapping, + userStateRecorder ) - val newChargingReason = MonthlyBillChargingReason(chargingReason, billingMonthInfo) - workingUserState.chargingReason = newChargingReason - val monthlyUserState0 = workingUserState.toUserState( - true, - billingMonthInfo.year, - billingMonthInfo.month, - None - ) + val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState). + setIsFullBillingMonth(true). + setBillingYear(billingMonthInfo.year). + setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay? + setOriginalID(""). + build() // We always save the state when it is a full month billing val monthlyUserState1 = userStateRecorder.apply(monthlyUserState0) - clog.debug("Stored full %s %s", billingMonthInfo.toDebugString, monthlyUserState1.toJsonString) + Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1)) - workingUserState + monthlyUserState1 } - val userID = userStateBootstrap.userID - val userCreationMillis = userStateBootstrap.userCreationMillis + val userID = userAgreementHistoryModel.userID + val userCreationMillis = userAgreementHistoryModel.unsafeUserCreationMillis val userCreationDateCalc = new MutableDateCalc(userCreationMillis) val billingMonthStartMillis = billingMonthInfo.monthStartMillis val billingMonthStopMillis = billingMonthInfo.monthStopMillis if(billingMonthStopMillis < userCreationMillis) { // If the user did not exist for this billing month, piece of cake - clog.debug("User did not exist before %s", userCreationDateCalc) + Debug(logger, "User did not exist before %s", userCreationDateCalc) // TODO: The initial user state might have already been created. // First ask if it exists and compute only if not - val initialUserState0 = StdUserState.createInitialUserStateFromBootstrap( - userStateBootstrap, - TimeHelpers.nowMillis(), - InitialUserStateSetup(Some(chargingReason)) // we record the originating calculation reason + val initialUserState0 = MessageFactory.newInitialUserStateMsg( + userID, + Real.Zero, + TimeHelpers.nowMillis() ) - logger.debug("Created (from bootstrap) initial user state %s".format(initialUserState0)) + Debug(logger, "Created (from bootstrap) initial user state %s", initialUserState0) // We always save the initial state val initialUserState1 = userStateRecorder.apply(initialUserState0) - clog.debug("Stored initial state = %s", initialUserState1.toJsonString) - clog.end() + Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1)) - return initialUserState1.toWorkingUserState(defaultResourceTypesMap) + return initialUserState1 } // Ask DB cache for the latest known user state for this billing period @@ -149,15 +175,13 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo latestUserStateOpt match { case None ⇒ // Not found, must compute - clog.debug("No user state found from cache, will have to (re)compute") - val result = computeFullMonthBillingAndSaveState - clog.end() - result + Debug(logger, "No user state found from cache, will have to (re)compute") + computeFullMonthBillingAndSaveState case Some(latestUserState) ⇒ // Found a "latest" user state but need to see if it is indeed the true and one latest. // For this reason, we must count the events again. - val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter + val latestStateOOSEventsCounter = latestUserState.getBillingPeriodOutOfSyncResourceEventsCounter val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod( userID, billingMonthStartMillis, @@ -168,104 +192,124 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo // ZERO, we are OK! case 0 ⇒ // NOTE: Keep the caller's calculation reason - val userStateModel = latestUserState.newWithChargingReason(chargingReason) - clog.end() - userStateModel.toWorkingUserState(defaultResourceTypesMap) + latestUserState // We had more, so must recompute case n if n > 0 ⇒ - clog.debug( + Debug(logger, "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n) - val workingUserState = computeFullMonthBillingAndSaveState - clog.end() - workingUserState + computeFullMonthBillingAndSaveState // We had less???? case n if n < 0 ⇒ val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n) - clog.warn(errMsg) + Warn(logger, errMsg) throw new AquariumInternalError(errMsg) } } } /** - * Processes one resource event and computes relevant charges. + * Processes one resource event and computes relevant, incremental charges. + * If needed, it may go back in time and recompute stuff. * * @param resourceEvent - * @param workingUserState - * @param chargingReason - * @param billingMonthInfo - * @param clogOpt + * @param userStateMsg */ def processResourceEvent( - resourceEvent: ResourceEventModel, - workingUserState: WorkingUserState, - chargingReason: ChargingReason, - billingMonthInfo: BillingMonthInfo, - clogOpt: Option[ContextualLogger] - ): Unit = { + processingTimeMillis: Long, + resourceEvent: ResourceEventMsg, + userAgreementHistoryModel: UserAgreementHistoryModel, + userStateMsg: UserStateMsg, + resourceMapping: JMap[String, ResourceTypeMsg], + updateLatestMillis: Boolean + ) { + require(userStateMsg ne null, "userStateMsg ne null") + + val resourceName = resourceEvent.getResource + val resourceTypeMsg = resourceMapping.get(resourceName) + + val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg) + val resourcesChargingState = MessageHelpers.getOrInitializeResourcesChargingState( + userStateMsg, + resourceName, + chargingBehavior.initialChargingDetails + ) + + val eventOccurredMillis = resourceEvent.getOccurredMillis + val eventReceivedMillis = resourceEvent.getReceivedMillis + val billingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis) + + + // See if this is out-of-sync - val resourceTypeName = resourceEvent.resource - val resourceTypeOpt = workingUserState.findResourceType(resourceTypeName) - if(resourceTypeOpt.isEmpty) { - return - } - val resourceType = resourceTypeOpt.get - val resourceAndInstanceInfo = resourceEvent.safeResourceInstanceInfo - val chargingBehavior = aquarium.chargingBehaviorOf(resourceType) - val (walletEntriesCount, newTotalCredits) = chargingBehavior.chargeResourceEvent( + val m0 = TimeHelpers.nowMillis() + val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent( aquarium, resourceEvent, - resourceType, + resourceTypeMsg, billingMonthInfo, - workingUserState.workingAgreementHistory.toAgreementHistory, - workingUserState.getChargingDataForResourceEvent(resourceAndInstanceInfo), - workingUserState.totalCredits, - workingUserState.walletEntries += _, - clogOpt + resourcesChargingState, + userAgreementHistoryModel, + userStateMsg, + msg ⇒ userStateMsg.getWalletEntries.add(msg) ) + val m1 = TimeHelpers.nowMillis() - workingUserState.totalCredits = newTotalCredits + if(updateLatestMillis) { + userStateMsg.setLatestUpdateMillis(m1) + } + + MessageHelpers.updateLatestResourceEventOccurredMillis(userStateMsg, resourceEvent.getOccurredMillis) + MessageHelpers.subtractCredits(userStateMsg, creditsToSubtract) + + true } def processResourceEvents( - resourceEvents: Traversable[ResourceEventModel], - workingUserState: WorkingUserState, - chargingReason: ChargingReason, - billingMonthInfo: BillingMonthInfo, - clogOpt: Option[ContextualLogger] = None + processingTimeMillis: Long, + resourceEvents: Traversable[ResourceEventMsg], + userAgreementHistoryModel: UserAgreementHistoryModel, + userStateMsg: UserStateMsg, + resourceMapping: JMap[String, ResourceTypeMsg], + latestUpdateMillis: Long ): Unit = { + var _counter = 0 for(currentResourceEvent ← resourceEvents) { processResourceEvent( + processingTimeMillis, currentResourceEvent, - workingUserState, - chargingReason, - billingMonthInfo, - clogOpt + userAgreementHistoryModel, + userStateMsg, + resourceMapping, + false ) + + _counter += 1 + } + + if(_counter > 0) { + userStateMsg.setLatestUpdateMillis(latestUpdateMillis) } } def replayFullMonthBilling( - userStateBootstrap: UserStateBootstrap, + processingTimeMillis: Long, + userAgreementHistoryModel: UserAgreementHistoryModel, billingMonthInfo: BillingMonthInfo, - defaultResourceTypesMap: Map[String, ResourceType], - chargingReason: ChargingReason, - userStateRecorder: UserStateModel ⇒ UserStateModel, - clogOpt: Option[ContextualLogger] - ): WorkingUserState = { + resourceMapping: JMap[String, ResourceTypeMsg], + userStateRecorder: UserStateMsg ⇒ UserStateMsg + ): UserStateMsg = { replayMonthChargingUpTo( + processingTimeMillis, + userAgreementHistoryModel, billingMonthInfo, billingMonthInfo.monthStopMillis, - userStateBootstrap, - defaultResourceTypesMap, - chargingReason, - userStateRecorder, - clogOpt + resourceMapping, + userStateRecorder ) } @@ -275,45 +319,29 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo * * @param billingMonthInfo Which month to bill. * @param billingEndTimeMillis Bill from start of month up to (and including) this time. - * @param userStateBootstrap - * @param resourceTypesMap - * @param chargingReason * @param userStateRecorder - * @param clogOpt * @return */ def replayMonthChargingUpTo( + processingTimeMillis: Long, + userAgreementHistoryModel: UserAgreementHistoryModel, billingMonthInfo: BillingMonthInfo, billingEndTimeMillis: Long, - userStateBootstrap: UserStateBootstrap, - resourceTypesMap: Map[String, ResourceType], - chargingReason: ChargingReason, - userStateRecorder: UserStateModel ⇒ UserStateModel, - clogOpt: Option[ContextualLogger] - ): WorkingUserState = { + resourceMapping: JMap[String, ResourceTypeMsg], + userStateRecorder: UserStateMsg ⇒ UserStateMsg + ): UserStateMsg = { val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis - val userID = userStateBootstrap.userID - - val clog = ContextualLogger.fromOther( - clogOpt, - logger, - "replayMonthChargingUpTo(%s)", TimeHelpers.toYYYYMMDDHHMMSSSSS(billingEndTimeMillis)) - clog.begin() - - clog.debug("%s", chargingReason) - - val clogSome = Some(clog) + val userID = userAgreementHistoryModel.userID // In order to replay the full month, we start with the state at the beginning of the month. val previousBillingMonthInfo = billingMonthInfo.previousMonth - val workingUserState = findOrCalculateWorkingUserStateAtEndOfBillingMonth( + val userStateMsg = findOrCalculateWorkingUserStateAtEndOfBillingMonth( + processingTimeMillis, + userAgreementHistoryModel, previousBillingMonthInfo, - userStateBootstrap, - resourceTypesMap, - chargingReason, - userStateRecorder, - clogSome + resourceMapping, + userStateRecorder ) // FIXME the below comments @@ -321,9 +349,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo // specified in the parameters. // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies - clog.debug("previousBillingMonthUserState(%s) = %s".format( + Debug(logger, "workingUserState=%s", userStateMsg) + Debug(logger, "previousBillingMonthUserState(%s) = %s", previousBillingMonthInfo.toShortDebugString, - workingUserState.toJsonString) + userStateMsg ) var _rcEventsCounter = 0 @@ -333,22 +362,31 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo billingEndTimeMillis // to requested time ) { currentResourceEvent ⇒ - clog.debug("Processing %s".format(currentResourceEvent)) + Debug(logger, "Processing %s", currentResourceEvent) processResourceEvent( + processingTimeMillis, currentResourceEvent, - workingUserState, - chargingReason, - billingMonthInfo, - clogSome + userAgreementHistoryModel, + userStateMsg, + resourceMapping, + false ) _rcEventsCounter += 1 } - clog.debug("Found %s resource events for month %s".format(_rcEventsCounter, billingMonthInfo.toShortDebugString)) + if(_rcEventsCounter > 0) { + userStateMsg.setLatestUpdateMillis(TimeHelpers.nowMillis()) + } - if(isFullMonthBilling) { + Debug(logger, "Found %s resource events for month %s", + _rcEventsCounter, + billingMonthInfo.toShortDebugString + ) + + // FIXME Reuse the logic here...Do not erase the comment... + /*if(isFullMonthBilling) { // For the remaining events which must contribute an implicit OFF, we collect those OFFs // ... in order to generate an implicit ON later (during the next billing cycle). val (generatorsOfImplicitEnds, theirImplicitEnds) = workingUserState.findAndRemoveGeneratorsOfImplicitEndEvents( @@ -357,10 +395,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo ) if(generatorsOfImplicitEnds.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) { - clog.debug("") - clog.debug("Process implicitly issued events") - clog.debugSeq("generatorsOfImplicitEnds", generatorsOfImplicitEnds, 0) - clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0) + Debug(logger, "") + Debug(logger, "Process implicitly issued events") + DebugSeq(logger, "generatorsOfImplicitEnds", generatorsOfImplicitEnds, 0) + DebugSeq(logger, "theirImplicitEnds", theirImplicitEnds, 0) } // Now, the previous and implicitly started must be our base for the following computation, so we create an @@ -373,15 +411,13 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo theirImplicitEnds, specialWorkingUserState, chargingReason, - billingMonthInfo, - clogSome + billingMonthInfo ) workingUserState.walletEntries ++= specialWorkingUserState.walletEntries workingUserState.totalCredits = specialWorkingUserState.totalCredits - } + }*/ - clog.end() - workingUserState + userStateMsg } }