package gr.grnet.aquarium.charging
-import scala.collection.mutable
-import gr.grnet.aquarium.event.model.resource.ResourceEventModel
import gr.grnet.aquarium.computation.BillingMonthInfo
-import gr.grnet.aquarium.charging.state.{WorkingResourcesChargingState, UserStateBootstrap, WorkingUserState, UserStateModel, StdUserState}
-import gr.grnet.aquarium.policy.ResourceType
-import gr.grnet.aquarium.util.{Lifecycle, Loggable}
+import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserStateMsg, ResourceEventMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, AvroHelpers}
import gr.grnet.aquarium.util.LogHelpers.Debug
import gr.grnet.aquarium.util.LogHelpers.DebugSeq
import gr.grnet.aquarium.util.LogHelpers.Warn
import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
-import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
+import gr.grnet.aquarium.util.{Lifecycle, Loggable}
+import gr.grnet.aquarium.{Real, AquariumInternalError, AquariumAwareSkeleton}
+import java.util.{Map ⇒ JMap}
+import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel}
/**
*
def stop() {}
//- Lifecycle
-
- //+ Utility methods
- protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
- rcEvent.toDebugString
- }
- //- Utility methods
-
- def calculateRealtimeWorkingUserState(
- workingUserState: WorkingUserState,
- billingMonthInfo: BillingMonthInfo,
+ def calculateRealtimeUserState(
+ userStateModel: UserStateModel,
+ resourceMapping: JMap[String, ResourceTypeMsg],
realtimeMillis: Long
) {
- for( (resourceTypeName, workingResourcesState) ← workingUserState.workingStateOfResources) {
- workingUserState.findResourceType(resourceTypeName) match {
- case None ⇒
+
+ val userStateMsg = userStateModel.userStateMsg
+ val userAgreementHistoryModel = userStateModel.userAgreementHistoryMsg
+
+ import scala.collection.JavaConverters.mapAsScalaMapConverter
+
+ val stateOfResources = userStateMsg.getStateOfResources.asScala
+
+ for( (resourceName, workingResourcesState) ← stateOfResources) {
+ resourceMapping.get(resourceName) match {
+ case null ⇒
// Ignore
- case Some(resourceType) ⇒
- val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
+ case resourceTypeMsg ⇒
+ val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
+ val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala
- for((resourceInstanceID, workingResourceInstanceState) ← workingResourcesState.stateOfResourceInstance) {
- Debug(logger, "Realtime calculation for %s, %s", resourceTypeName, resourceInstanceID)
+ for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) {
+ Debug(logger, "Realtime calculation for %s, %s", resourceName, resourceInstanceID)
val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
- workingUserState.userID,
- resourceTypeName,
+ userStateMsg.getUserID,
+ resourceName,
resourceInstanceID,
realtimeMillis,
- workingResourceInstanceState
+ resourceInstanceState
)
DebugSeq(logger, "virtualEvents", virtualEvents, 1)
processResourceEvents(
+ realtimeMillis,
virtualEvents,
- workingUserState,
- billingMonthInfo,
+ userStateModel,
+ resourceMapping,
realtimeMillis
)
}
}
def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+ processingTimeMillis: Long,
+ userStateModel: UserStateModel,
billingMonthInfo: BillingMonthInfo,
- userStateBootstrap: UserStateBootstrap,
- defaultResourceTypesMap: Map[String, ResourceType],
- userStateRecorder: UserStateModel ⇒ UserStateModel
- ): WorkingUserState = {
-
- def computeFullMonthBillingAndSaveState(): WorkingUserState = {
- val workingUserState = replayFullMonthBilling(
- userStateBootstrap,
+ resourceMapping: JMap[String, ResourceTypeMsg],
+ userStateRecorder: UserStateMsg ⇒ UserStateMsg
+ ): UserStateMsg = {
+
+ def computeFullMonthBillingAndSaveState(): UserStateMsg = {
+ val fullMonthUserState = replayFullMonthBilling(
+ processingTimeMillis,
+ userStateModel,
billingMonthInfo,
- defaultResourceTypesMap,
+ resourceMapping,
userStateRecorder
)
- val monthlyUserState0 = workingUserState.toUserState(
- true,
- billingMonthInfo.year,
- billingMonthInfo.month,
- ""
- )
+ val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState).
+ setIsForFullMonth(true).
+ setBillingYear(billingMonthInfo.year).
+ setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay?
+ setOriginalID("").
+ build()
// We always save the state when it is a full month billing
val monthlyUserState1 = userStateRecorder.apply(monthlyUserState0)
- Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, monthlyUserState1.toJsonString)
+ Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1))
- workingUserState
+ monthlyUserState1
}
- val userID = userStateBootstrap.userID
- val userCreationMillis = userStateBootstrap.userCreationMillis
+ val userID = userStateModel.userID
+ val userCreationMillis = userStateModel.unsafeUserCreationMillis
val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
val billingMonthStartMillis = billingMonthInfo.monthStartMillis
val billingMonthStopMillis = billingMonthInfo.monthStopMillis
// TODO: The initial user state might have already been created.
// First ask if it exists and compute only if not
- val initialUserState0 = StdUserState.createInitialUserStateFromBootstrap(
- userStateBootstrap,
+ val initialUserState0 = MessageFactory.newInitialUserStateMsg(
+ userID,
+ Real.Zero,
TimeHelpers.nowMillis()
)
// We always save the initial state
val initialUserState1 = userStateRecorder.apply(initialUserState0)
- Debug(logger, "Stored initial state = %s", initialUserState1.toJsonString)
+ Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1))
- return initialUserState1.toWorkingUserState(defaultResourceTypesMap)
+ return initialUserState1
}
// Ask DB cache for the latest known user state for this billing period
case Some(latestUserState) ⇒
// Found a "latest" user state but need to see if it is indeed the true and one latest.
// For this reason, we must count the events again.
- val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
+ val latestStateOOSEventsCounter = latestUserState.getBillingPeriodOutOfSyncResourceEventsCounter
val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
userID,
billingMonthStartMillis,
// ZERO, we are OK!
case 0 ⇒
// NOTE: Keep the caller's calculation reason
- latestUserState.toWorkingUserState(defaultResourceTypesMap)
+ latestUserState
// We had more, so must recompute
case n if n > 0 ⇒
}
/**
* Processes one resource event and computes relevant, incremental charges.
+ * If needed, it may go back in time and recompute stuff.
*
* @param resourceEvent
- * @param workingUserState
- * @param billingMonthInfo
*/
def processResourceEvent(
- resourceEvent: ResourceEventModel,
- workingUserState: WorkingUserState,
- billingMonthInfo: BillingMonthInfo,
+ processingTimeMillis: Long,
+ resourceEvent: ResourceEventMsg,
+ userStateModel: UserStateModel,
+ resourceMapping: JMap[String, ResourceTypeMsg],
updateLatestMillis: Boolean
- ): Boolean = {
+ ) {
+ val userStateMsg = userStateModel.userStateMsg
+ val userAgreementHistoryModel = userStateModel.userAgreementHistoryMsg
- val resourceTypeName = resourceEvent.resource
- val resourceTypeOpt = workingUserState.findResourceType(resourceTypeName)
- if(resourceTypeOpt.isEmpty) {
- // Unknown (yet) resource, ignoring event.
- return false
- }
- val resourceType = resourceTypeOpt.get
+ val resourceName = resourceEvent.getResource
+ val resourceTypeMsg = resourceMapping.get(resourceName)
+
+ val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
+ val resourcesChargingState = MessageHelpers.getOrInitializeResourcesChargingState(
+ userStateMsg,
+ resourceName,
+ chargingBehavior.initialChargingDetails
+ )
+
+ val eventOccurredMillis = resourceEvent.getOccurredMillis
+ val eventReceivedMillis = resourceEvent.getReceivedMillis
+ val billingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
+
+
+ // See if this is out-of-sync
- val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
- val workingResourcesState = workingUserState.workingStateOfResources.get(resourceTypeName) match {
- case Some(existingState) ⇒
- existingState
- case None ⇒
- // First time for this ChargingBehavior.
- val newState = new WorkingResourcesChargingState(
- details = mutable.Map(chargingBehavior.initialChargingDetails.toSeq:_*),
- stateOfResourceInstance = mutable.Map()
- )
-
- workingUserState.workingStateOfResources(resourceTypeName) = newState
- newState
- }
val m0 = TimeHelpers.nowMillis()
val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
aquarium,
resourceEvent,
- resourceType,
+ resourceTypeMsg,
billingMonthInfo,
- workingResourcesState,
- workingUserState.workingAgreementHistory,
- workingUserState.totalCredits,
- workingUserState.walletEntries += _
+ resourcesChargingState,
+ userStateModel,
+ msg ⇒ userStateMsg.getWalletEntries.add(msg)
)
val m1 = TimeHelpers.nowMillis()
if(updateLatestMillis) {
- workingUserState.latestUpdateMillis = m1
+ userStateMsg.setLatestUpdateMillis(m1)
}
- workingUserState.updateLatestResourceEventOccurredMillis(resourceEvent.occurredMillis)
- workingUserState.totalCredits -= creditsToSubtract
+ MessageHelpers.updateLatestResourceEventOccurredMillis(userStateMsg, resourceEvent.getOccurredMillis)
+ MessageHelpers.subtractCredits(userStateMsg, creditsToSubtract)
true
}
def processResourceEvents(
- resourceEvents: Traversable[ResourceEventModel],
- workingUserState: WorkingUserState,
- billingMonthInfo: BillingMonthInfo,
+ processingTimeMillis: Long,
+ resourceEvents: Traversable[ResourceEventMsg],
+ userStateModel: UserStateModel,
+ resourceMapping: JMap[String, ResourceTypeMsg],
latestUpdateMillis: Long
): Unit = {
var _counter = 0
for(currentResourceEvent ← resourceEvents) {
processResourceEvent(
+ processingTimeMillis,
currentResourceEvent,
- workingUserState,
- billingMonthInfo,
+ userStateModel,
+ resourceMapping,
false
)
}
if(_counter > 0) {
- workingUserState.latestUpdateMillis = latestUpdateMillis
+ userStateModel.userStateMsg.setLatestUpdateMillis(latestUpdateMillis)
}
}
def replayFullMonthBilling(
- userStateBootstrap: UserStateBootstrap,
+ processingTimeMillis: Long,
+ userStateModel: UserStateModel,
billingMonthInfo: BillingMonthInfo,
- defaultResourceTypesMap: Map[String, ResourceType],
- userStateRecorder: UserStateModel ⇒ UserStateModel
- ): WorkingUserState = {
+ resourceMapping: JMap[String, ResourceTypeMsg],
+ userStateRecorder: UserStateMsg ⇒ UserStateMsg
+ ): UserStateMsg = {
replayMonthChargingUpTo(
+ processingTimeMillis,
+ userStateModel,
billingMonthInfo,
billingMonthInfo.monthStopMillis,
- userStateBootstrap,
- defaultResourceTypesMap,
+ resourceMapping,
userStateRecorder
)
}
*
* @param billingMonthInfo Which month to bill.
* @param billingEndTimeMillis Bill from start of month up to (and including) this time.
- * @param userStateBootstrap
- * @param resourceTypesMap
* @param userStateRecorder
* @return
*/
def replayMonthChargingUpTo(
+ processingTimeMillis: Long,
+ userStateModel: UserStateModel,
billingMonthInfo: BillingMonthInfo,
billingEndTimeMillis: Long,
- userStateBootstrap: UserStateBootstrap,
- resourceTypesMap: Map[String, ResourceType],
- userStateRecorder: UserStateModel ⇒ UserStateModel
- ): WorkingUserState = {
+ resourceMapping: JMap[String, ResourceTypeMsg],
+ userStateRecorder: UserStateMsg ⇒ UserStateMsg
+ ): UserStateMsg = {
+
+ val userAgreementHistoryMsg = userStateModel.userAgreementHistoryMsg
val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
- val userID = userStateBootstrap.userID
+ val userID = userStateModel.userID
// In order to replay the full month, we start with the state at the beginning of the month.
val previousBillingMonthInfo = billingMonthInfo.previousMonth
- val workingUserState = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+ val userStateMsg = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+ processingTimeMillis,
+ userStateModel,
previousBillingMonthInfo,
- userStateBootstrap,
- resourceTypesMap,
+ resourceMapping,
userStateRecorder
)
// specified in the parameters.
// NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
- Debug(logger, "workingUserState=%s", workingUserState)
+ Debug(logger, "workingUserState=%s", userStateMsg)
Debug(logger, "previousBillingMonthUserState(%s) = %s",
previousBillingMonthInfo.toShortDebugString,
- workingUserState
+ userStateMsg
)
var _rcEventsCounter = 0
Debug(logger, "Processing %s", currentResourceEvent)
processResourceEvent(
+ processingTimeMillis,
currentResourceEvent,
- workingUserState,
- billingMonthInfo,
+ userStateModel,
+ resourceMapping,
false
)
}
if(_rcEventsCounter > 0) {
- workingUserState.latestUpdateMillis = TimeHelpers.nowMillis()
+ userStateMsg.setLatestUpdateMillis(TimeHelpers.nowMillis())
}
Debug(logger, "Found %s resource events for month %s",
workingUserState.totalCredits = specialWorkingUserState.totalCredits
}*/
- workingUserState
+ userStateMsg
}
}