X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/56daab8c8f4bd85dc2d1c75dfda827a50bc50894..f2e3cc2beadcdfa86fb63e6b0429d958ec2adb8f:/src/main/scala/gr/grnet/aquarium/charging/ChargingService.scala?ds=sidebyside diff --git a/src/main/scala/gr/grnet/aquarium/charging/ChargingService.scala b/src/main/scala/gr/grnet/aquarium/charging/ChargingService.scala index 7a2db51..6b93fbd 100644 --- a/src/main/scala/gr/grnet/aquarium/charging/ChargingService.scala +++ b/src/main/scala/gr/grnet/aquarium/charging/ChargingService.scala @@ -35,17 +35,17 @@ package gr.grnet.aquarium.charging -import scala.collection.mutable -import gr.grnet.aquarium.event.model.resource.ResourceEventModel import gr.grnet.aquarium.computation.BillingMonthInfo -import gr.grnet.aquarium.charging.state.{WorkingResourcesChargingState, UserStateBootstrap, WorkingUserState, UserStateModel, StdUserState} -import gr.grnet.aquarium.policy.ResourceType -import gr.grnet.aquarium.util.{Lifecycle, Loggable} +import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserStateMsg, ResourceEventMsg} +import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, AvroHelpers} import gr.grnet.aquarium.util.LogHelpers.Debug import gr.grnet.aquarium.util.LogHelpers.DebugSeq import gr.grnet.aquarium.util.LogHelpers.Warn import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers} -import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton} +import gr.grnet.aquarium.util.{Lifecycle, Loggable} +import gr.grnet.aquarium.{Real, AquariumInternalError, AquariumAwareSkeleton} +import java.util.{Map ⇒ JMap} +import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel} /** * @@ -63,41 +63,44 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo def stop() {} //- Lifecycle - - //+ Utility methods - protected def rcDebugInfo(rcEvent: ResourceEventModel) = { - rcEvent.toDebugString - } - //- Utility methods - - def calculateRealtimeWorkingUserState( - workingUserState: WorkingUserState, - billingMonthInfo: BillingMonthInfo, + def calculateRealtimeUserState( + userStateModel: UserStateModel, + resourceMapping: JMap[String, ResourceTypeMsg], realtimeMillis: Long ) { - for( (resourceTypeName, workingResourcesState) ← workingUserState.workingStateOfResources) { - workingUserState.findResourceType(resourceTypeName) match { - case None ⇒ + + val userStateMsg = userStateModel.userStateMsg + val userAgreementHistoryModel = userStateModel.userAgreementHistoryMsg + + import scala.collection.JavaConverters.mapAsScalaMapConverter + + val stateOfResources = userStateMsg.getStateOfResources.asScala + + for( (resourceName, workingResourcesState) ← stateOfResources) { + resourceMapping.get(resourceName) match { + case null ⇒ // Ignore - case Some(resourceType) ⇒ - val chargingBehavior = aquarium.chargingBehaviorOf(resourceType) + case resourceTypeMsg ⇒ + val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg) + val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala - for((resourceInstanceID, workingResourceInstanceState) ← workingResourcesState.stateOfResourceInstance) { - Debug(logger, "Realtime calculation for %s, %s", resourceTypeName, resourceInstanceID) + for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) { + Debug(logger, "Realtime calculation for %s, %s", resourceName, resourceInstanceID) val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation( - workingUserState.userID, - resourceTypeName, + userStateMsg.getUserID, + resourceName, resourceInstanceID, realtimeMillis, - workingResourceInstanceState + resourceInstanceState ) DebugSeq(logger, "virtualEvents", virtualEvents, 1) processResourceEvents( + realtimeMillis, virtualEvents, - workingUserState, - billingMonthInfo, + userStateModel, + resourceMapping, realtimeMillis ) } @@ -106,37 +109,39 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo } def findOrCalculateWorkingUserStateAtEndOfBillingMonth( + processingTimeMillis: Long, + userStateModel: UserStateModel, billingMonthInfo: BillingMonthInfo, - userStateBootstrap: UserStateBootstrap, - defaultResourceTypesMap: Map[String, ResourceType], - userStateRecorder: UserStateModel ⇒ UserStateModel - ): WorkingUserState = { - - def computeFullMonthBillingAndSaveState(): WorkingUserState = { - val workingUserState = replayFullMonthBilling( - userStateBootstrap, + resourceMapping: JMap[String, ResourceTypeMsg], + userStateRecorder: UserStateMsg ⇒ UserStateMsg + ): UserStateMsg = { + + def computeFullMonthBillingAndSaveState(): UserStateMsg = { + val fullMonthUserState = replayFullMonthBilling( + processingTimeMillis, + userStateModel, billingMonthInfo, - defaultResourceTypesMap, + resourceMapping, userStateRecorder ) - val monthlyUserState0 = workingUserState.toUserState( - true, - billingMonthInfo.year, - billingMonthInfo.month, - "" - ) + val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState). + setIsForFullMonth(true). + setBillingYear(billingMonthInfo.year). + setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay? + setOriginalID(""). + build() // We always save the state when it is a full month billing val monthlyUserState1 = userStateRecorder.apply(monthlyUserState0) - Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, monthlyUserState1.toJsonString) + Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1)) - workingUserState + monthlyUserState1 } - val userID = userStateBootstrap.userID - val userCreationMillis = userStateBootstrap.userCreationMillis + val userID = userStateModel.userID + val userCreationMillis = userStateModel.unsafeUserCreationMillis val userCreationDateCalc = new MutableDateCalc(userCreationMillis) val billingMonthStartMillis = billingMonthInfo.monthStartMillis val billingMonthStopMillis = billingMonthInfo.monthStopMillis @@ -147,8 +152,9 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo // TODO: The initial user state might have already been created. // First ask if it exists and compute only if not - val initialUserState0 = StdUserState.createInitialUserStateFromBootstrap( - userStateBootstrap, + val initialUserState0 = MessageFactory.newInitialUserStateMsg( + userID, + Real.Zero, TimeHelpers.nowMillis() ) @@ -157,9 +163,9 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo // We always save the initial state val initialUserState1 = userStateRecorder.apply(initialUserState0) - Debug(logger, "Stored initial state = %s", initialUserState1.toJsonString) + Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1)) - return initialUserState1.toWorkingUserState(defaultResourceTypesMap) + return initialUserState1 } // Ask DB cache for the latest known user state for this billing period @@ -176,7 +182,7 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo case Some(latestUserState) ⇒ // Found a "latest" user state but need to see if it is indeed the true and one latest. // For this reason, we must count the events again. - val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter + val latestStateOOSEventsCounter = latestUserState.getBillingPeriodOutOfSyncResourceEventsCounter val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod( userID, billingMonthStartMillis, @@ -187,7 +193,7 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo // ZERO, we are OK! case 0 ⇒ // NOTE: Keep the caller's calculation reason - latestUserState.toWorkingUserState(defaultResourceTypesMap) + latestUserState // We had more, so must recompute case n if n > 0 ⇒ @@ -205,78 +211,76 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo } /** * Processes one resource event and computes relevant, incremental charges. + * If needed, it may go back in time and recompute stuff. * * @param resourceEvent - * @param workingUserState - * @param billingMonthInfo */ def processResourceEvent( - resourceEvent: ResourceEventModel, - workingUserState: WorkingUserState, - billingMonthInfo: BillingMonthInfo, + processingTimeMillis: Long, + resourceEvent: ResourceEventMsg, + userStateModel: UserStateModel, + resourceMapping: JMap[String, ResourceTypeMsg], updateLatestMillis: Boolean - ): Boolean = { + ) { + val userStateMsg = userStateModel.userStateMsg + val userAgreementHistoryModel = userStateModel.userAgreementHistoryMsg - val resourceTypeName = resourceEvent.resource - val resourceTypeOpt = workingUserState.findResourceType(resourceTypeName) - if(resourceTypeOpt.isEmpty) { - // Unknown (yet) resource, ignoring event. - return false - } - val resourceType = resourceTypeOpt.get + val resourceName = resourceEvent.getResource + val resourceTypeMsg = resourceMapping.get(resourceName) + + val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg) + val resourcesChargingState = MessageHelpers.getOrInitializeResourcesChargingState( + userStateMsg, + resourceName, + chargingBehavior.initialChargingDetails + ) + + val eventOccurredMillis = resourceEvent.getOccurredMillis + val eventReceivedMillis = resourceEvent.getReceivedMillis + val billingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis) + + + // See if this is out-of-sync - val chargingBehavior = aquarium.chargingBehaviorOf(resourceType) - val workingResourcesState = workingUserState.workingStateOfResources.get(resourceTypeName) match { - case Some(existingState) ⇒ - existingState - case None ⇒ - // First time for this ChargingBehavior. - val newState = new WorkingResourcesChargingState( - details = mutable.Map(chargingBehavior.initialChargingDetails.toSeq:_*), - stateOfResourceInstance = mutable.Map() - ) - - workingUserState.workingStateOfResources(resourceTypeName) = newState - newState - } val m0 = TimeHelpers.nowMillis() val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent( aquarium, resourceEvent, - resourceType, + resourceTypeMsg, billingMonthInfo, - workingResourcesState, - workingUserState.workingAgreementHistory, - workingUserState.totalCredits, - workingUserState.walletEntries += _ + resourcesChargingState, + userStateModel, + msg ⇒ userStateMsg.getWalletEntries.add(msg) ) val m1 = TimeHelpers.nowMillis() if(updateLatestMillis) { - workingUserState.latestUpdateMillis = m1 + userStateMsg.setLatestUpdateMillis(m1) } - workingUserState.updateLatestResourceEventOccurredMillis(resourceEvent.occurredMillis) - workingUserState.totalCredits -= creditsToSubtract + MessageHelpers.updateLatestResourceEventOccurredMillis(userStateMsg, resourceEvent.getOccurredMillis) + MessageHelpers.subtractCredits(userStateMsg, creditsToSubtract) true } def processResourceEvents( - resourceEvents: Traversable[ResourceEventModel], - workingUserState: WorkingUserState, - billingMonthInfo: BillingMonthInfo, + processingTimeMillis: Long, + resourceEvents: Traversable[ResourceEventMsg], + userStateModel: UserStateModel, + resourceMapping: JMap[String, ResourceTypeMsg], latestUpdateMillis: Long ): Unit = { var _counter = 0 for(currentResourceEvent ← resourceEvents) { processResourceEvent( + processingTimeMillis, currentResourceEvent, - workingUserState, - billingMonthInfo, + userStateModel, + resourceMapping, false ) @@ -284,22 +288,24 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo } if(_counter > 0) { - workingUserState.latestUpdateMillis = latestUpdateMillis + userStateModel.userStateMsg.setLatestUpdateMillis(latestUpdateMillis) } } def replayFullMonthBilling( - userStateBootstrap: UserStateBootstrap, + processingTimeMillis: Long, + userStateModel: UserStateModel, billingMonthInfo: BillingMonthInfo, - defaultResourceTypesMap: Map[String, ResourceType], - userStateRecorder: UserStateModel ⇒ UserStateModel - ): WorkingUserState = { + resourceMapping: JMap[String, ResourceTypeMsg], + userStateRecorder: UserStateMsg ⇒ UserStateMsg + ): UserStateMsg = { replayMonthChargingUpTo( + processingTimeMillis, + userStateModel, billingMonthInfo, billingMonthInfo.monthStopMillis, - userStateBootstrap, - defaultResourceTypesMap, + resourceMapping, userStateRecorder ) } @@ -310,28 +316,30 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo * * @param billingMonthInfo Which month to bill. * @param billingEndTimeMillis Bill from start of month up to (and including) this time. - * @param userStateBootstrap - * @param resourceTypesMap * @param userStateRecorder * @return */ def replayMonthChargingUpTo( + processingTimeMillis: Long, + userStateModel: UserStateModel, billingMonthInfo: BillingMonthInfo, billingEndTimeMillis: Long, - userStateBootstrap: UserStateBootstrap, - resourceTypesMap: Map[String, ResourceType], - userStateRecorder: UserStateModel ⇒ UserStateModel - ): WorkingUserState = { + resourceMapping: JMap[String, ResourceTypeMsg], + userStateRecorder: UserStateMsg ⇒ UserStateMsg + ): UserStateMsg = { + + val userAgreementHistoryMsg = userStateModel.userAgreementHistoryMsg val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis - val userID = userStateBootstrap.userID + val userID = userStateModel.userID // In order to replay the full month, we start with the state at the beginning of the month. val previousBillingMonthInfo = billingMonthInfo.previousMonth - val workingUserState = findOrCalculateWorkingUserStateAtEndOfBillingMonth( + val userStateMsg = findOrCalculateWorkingUserStateAtEndOfBillingMonth( + processingTimeMillis, + userStateModel, previousBillingMonthInfo, - userStateBootstrap, - resourceTypesMap, + resourceMapping, userStateRecorder ) @@ -340,10 +348,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo // specified in the parameters. // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies - Debug(logger, "workingUserState=%s", workingUserState) + Debug(logger, "workingUserState=%s", userStateMsg) Debug(logger, "previousBillingMonthUserState(%s) = %s", previousBillingMonthInfo.toShortDebugString, - workingUserState + userStateMsg ) var _rcEventsCounter = 0 @@ -356,9 +364,10 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo Debug(logger, "Processing %s", currentResourceEvent) processResourceEvent( + processingTimeMillis, currentResourceEvent, - workingUserState, - billingMonthInfo, + userStateModel, + resourceMapping, false ) @@ -366,7 +375,7 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo } if(_rcEventsCounter > 0) { - workingUserState.latestUpdateMillis = TimeHelpers.nowMillis() + userStateMsg.setLatestUpdateMillis(TimeHelpers.nowMillis()) } Debug(logger, "Found %s resource events for month %s", @@ -407,6 +416,6 @@ final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Lo workingUserState.totalCredits = specialWorkingUserState.totalCredits }*/ - workingUserState + userStateMsg } }