package gr.grnet.aquarium.charging
-import gr.grnet.aquarium.charging.state.{UserStateModel, UserStateBootstrap}
import gr.grnet.aquarium.computation.BillingMonthInfo
-import gr.grnet.aquarium.message.avro.gen.{ResourcesChargingStateMsg, UserStateMsg, ResourceEventMsg}
-import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, AvroHelpers}
-import gr.grnet.aquarium.policy.ResourceType
+import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserStateMsg, ResourceEventMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, AvroHelpers}
import gr.grnet.aquarium.util.LogHelpers.Debug
import gr.grnet.aquarium.util.LogHelpers.DebugSeq
import gr.grnet.aquarium.util.LogHelpers.Warn
import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
import gr.grnet.aquarium.util.{Lifecycle, Loggable}
-import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
-import java.util.{HashMap ⇒ JHashMap}
+import gr.grnet.aquarium.{Real, AquariumInternalError, AquariumAwareSkeleton}
+import java.util.{Map ⇒ JMap}
+import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel}
/**
*
//- Lifecycle
def calculateRealtimeUserState(
- userState: UserStateModel,
- billingMonthInfo: BillingMonthInfo,
+ userStateModel: UserStateModel,
+ resourceMapping: JMap[String, ResourceTypeMsg],
realtimeMillis: Long
) {
+ val userStateMsg = userStateModel.userStateMsg
+ val userAgreementHistoryModel = userStateModel.userAgreementHistoryMsg
+
import scala.collection.JavaConverters.mapAsScalaMapConverter
- val stateOfResources = userState.msg.getStateOfResources.asScala
- val resourceTypesMap = userState.msg.getResourceTypesMap.asScala
+ val stateOfResources = userStateMsg.getStateOfResources.asScala
- for( (resourceTypeName, workingResourcesState) ← stateOfResources) {
- userState.msg.getResourceTypesMap.get(resourceTypeName) match {
+ for( (resourceName, workingResourcesState) ← stateOfResources) {
+ resourceMapping.get(resourceName) match {
case null ⇒
// Ignore
- case resourceType ⇒
- val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
+ case resourceTypeMsg ⇒
+ val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala
for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) {
- Debug(logger, "Realtime calculation for %s, %s", resourceTypeName, resourceInstanceID)
+ Debug(logger, "Realtime calculation for %s, %s", resourceName, resourceInstanceID)
val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
- userState.userID,
- resourceTypeName,
+ userStateMsg.getUserID,
+ resourceName,
resourceInstanceID,
realtimeMillis,
resourceInstanceState
DebugSeq(logger, "virtualEvents", virtualEvents, 1)
processResourceEvents(
+ realtimeMillis,
virtualEvents,
- userState,
- billingMonthInfo,
+ userStateModel,
+ resourceMapping,
realtimeMillis
)
}
}
def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+ processingTimeMillis: Long,
+ userStateModel: UserStateModel,
billingMonthInfo: BillingMonthInfo,
- userStateBootstrap: UserStateBootstrap,
- defaultResourceTypesMap: Map[String, ResourceType],
+ resourceMapping: JMap[String, ResourceTypeMsg],
userStateRecorder: UserStateMsg ⇒ UserStateMsg
- ): UserStateModel = {
+ ): UserStateMsg = {
- def computeFullMonthBillingAndSaveState(): UserStateModel = {
+ def computeFullMonthBillingAndSaveState(): UserStateMsg = {
val fullMonthUserState = replayFullMonthBilling(
- userStateBootstrap,
+ processingTimeMillis,
+ userStateModel,
billingMonthInfo,
- defaultResourceTypesMap,
+ resourceMapping,
userStateRecorder
)
- val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState.msg).
- setIsFullBillingMonth(true).
+ val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState).
+ setIsForFullMonth(true).
setBillingYear(billingMonthInfo.year).
setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay?
setOriginalID("").
- setResourceTypesMap(MessageFactory.newResourceTypeMsgsMap(defaultResourceTypesMap)).
build()
// We always save the state when it is a full month billing
Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1))
- ModelFactory.newUserStateModel(monthlyUserState1)
+ monthlyUserState1
}
- val userID = userStateBootstrap.userID
- val userCreationMillis = userStateBootstrap.userCreationMillis
+ val userID = userStateModel.userID
+ val userCreationMillis = userStateModel.unsafeUserCreationMillis
val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
val billingMonthStartMillis = billingMonthInfo.monthStartMillis
val billingMonthStopMillis = billingMonthInfo.monthStopMillis
// TODO: The initial user state might have already been created.
// First ask if it exists and compute only if not
- val initialUserState0 = MessageFactory.createInitialUserStateMsg(
- userStateBootstrap,
- defaultResourceTypesMap,
+ val initialUserState0 = MessageFactory.newInitialUserStateMsg(
+ userID,
+ Real.Zero,
TimeHelpers.nowMillis()
)
Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1))
- return ModelFactory.newUserStateModel(initialUserState1)
+ return initialUserState1
}
// Ask DB cache for the latest known user state for this billing period
// ZERO, we are OK!
case 0 ⇒
// NOTE: Keep the caller's calculation reason
- ModelFactory.newUserStateModel(latestUserState)
+ latestUserState
// We had more, so must recompute
case n if n > 0 ⇒
}
/**
* Processes one resource event and computes relevant, incremental charges.
+ * If needed, it may go back in time and recompute stuff.
*
* @param resourceEvent
- * @param userStateModel
- * @param billingMonthInfo
*/
def processResourceEvent(
+ processingTimeMillis: Long,
resourceEvent: ResourceEventMsg,
userStateModel: UserStateModel,
- billingMonthInfo: BillingMonthInfo,
+ resourceMapping: JMap[String, ResourceTypeMsg],
updateLatestMillis: Boolean
- ): Boolean = {
- logger.warn("processResourceEvent:workingUserState=%s".format(userStateModel)) //
- val resourceTypeName = resourceEvent.getResource
- val resourceType = userStateModel.msg.getResourceTypesMap.get(resourceTypeName)
- if(resourceType eq null) {
- // Unknown (yet) resource, ignoring event.
- return false
- }
+ ) {
+ val userStateMsg = userStateModel.userStateMsg
+ val userAgreementHistoryModel = userStateModel.userAgreementHistoryMsg
+
+ val resourceName = resourceEvent.getResource
+ val resourceTypeMsg = resourceMapping.get(resourceName)
+
+ val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
+ val resourcesChargingState = MessageHelpers.getOrInitializeResourcesChargingState(
+ userStateMsg,
+ resourceName,
+ chargingBehavior.initialChargingDetails
+ )
+
+ val eventOccurredMillis = resourceEvent.getOccurredMillis
+ val eventReceivedMillis = resourceEvent.getReceivedMillis
+ val billingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
+
+
+ // See if this is out-of-sync
+
- val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
- val resourcesChargingState = userStateModel.msg.getStateOfResources.get(resourceTypeName) match {
- case null ⇒
- // First time for this ChargingBehavior.
- val newState = new ResourcesChargingStateMsg
- newState.setResource(resourceTypeName)
- newState.setDetails(chargingBehavior.initialChargingDetails)
- newState.setStateOfResourceInstance(new JHashMap())
- userStateModel.msg.getStateOfResources.put(resourceTypeName,newState)
- newState
- case existingState ⇒
- existingState
- }
val m0 = TimeHelpers.nowMillis()
val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
aquarium,
resourceEvent,
- resourceType,
+ resourceTypeMsg,
billingMonthInfo,
resourcesChargingState,
userStateModel,
- msg ⇒ userStateModel.msg.getWalletEntries.add(msg)
+ msg ⇒ userStateMsg.getWalletEntries.add(msg)
)
val m1 = TimeHelpers.nowMillis()
if(updateLatestMillis) {
- userStateModel.msg.setLatestUpdateMillis(m1)
+ userStateMsg.setLatestUpdateMillis(m1)
}
- userStateModel.updateLatestResourceEventOccurredMillis(resourceEvent.getOccurredMillis)
- userStateModel.subtractCredits(creditsToSubtract)
+ MessageHelpers.updateLatestResourceEventOccurredMillis(userStateMsg, resourceEvent.getOccurredMillis)
+ MessageHelpers.subtractCredits(userStateMsg, creditsToSubtract)
true
}
def processResourceEvents(
+ processingTimeMillis: Long,
resourceEvents: Traversable[ResourceEventMsg],
- userState: UserStateModel,
- billingMonthInfo: BillingMonthInfo,
+ userStateModel: UserStateModel,
+ resourceMapping: JMap[String, ResourceTypeMsg],
latestUpdateMillis: Long
): Unit = {
var _counter = 0
for(currentResourceEvent ← resourceEvents) {
processResourceEvent(
+ processingTimeMillis,
currentResourceEvent,
- userState,
- billingMonthInfo,
+ userStateModel,
+ resourceMapping,
false
)
}
if(_counter > 0) {
- userState.msg.setLatestUpdateMillis(latestUpdateMillis)
+ userStateModel.userStateMsg.setLatestUpdateMillis(latestUpdateMillis)
}
}
def replayFullMonthBilling(
- userStateBootstrap: UserStateBootstrap,
+ processingTimeMillis: Long,
+ userStateModel: UserStateModel,
billingMonthInfo: BillingMonthInfo,
- defaultResourceTypesMap: Map[String, ResourceType],
+ resourceMapping: JMap[String, ResourceTypeMsg],
userStateRecorder: UserStateMsg ⇒ UserStateMsg
- ): UserStateModel = {
+ ): UserStateMsg = {
replayMonthChargingUpTo(
+ processingTimeMillis,
+ userStateModel,
billingMonthInfo,
billingMonthInfo.monthStopMillis,
- userStateBootstrap,
- defaultResourceTypesMap,
+ resourceMapping,
userStateRecorder
)
}
*
* @param billingMonthInfo Which month to bill.
* @param billingEndTimeMillis Bill from start of month up to (and including) this time.
- * @param userStateBootstrap
- * @param resourceTypesMap
* @param userStateRecorder
* @return
*/
def replayMonthChargingUpTo(
+ processingTimeMillis: Long,
+ userStateModel: UserStateModel,
billingMonthInfo: BillingMonthInfo,
billingEndTimeMillis: Long,
- userStateBootstrap: UserStateBootstrap,
- resourceTypesMap: Map[String, ResourceType],
+ resourceMapping: JMap[String, ResourceTypeMsg],
userStateRecorder: UserStateMsg ⇒ UserStateMsg
- ): UserStateModel = {
+ ): UserStateMsg = {
+
+ val userAgreementHistoryMsg = userStateModel.userAgreementHistoryMsg
val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
- val userID = userStateBootstrap.userID
+ val userID = userStateModel.userID
// In order to replay the full month, we start with the state at the beginning of the month.
val previousBillingMonthInfo = billingMonthInfo.previousMonth
- val userState = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+ val userStateMsg = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+ processingTimeMillis,
+ userStateModel,
previousBillingMonthInfo,
- userStateBootstrap,
- resourceTypesMap,
+ resourceMapping,
userStateRecorder
)
// specified in the parameters.
// NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
- Debug(logger, "workingUserState=%s", userState)
+ Debug(logger, "workingUserState=%s", userStateMsg)
Debug(logger, "previousBillingMonthUserState(%s) = %s",
previousBillingMonthInfo.toShortDebugString,
- userState
+ userStateMsg
)
var _rcEventsCounter = 0
Debug(logger, "Processing %s", currentResourceEvent)
processResourceEvent(
+ processingTimeMillis,
currentResourceEvent,
- userState,
- billingMonthInfo,
+ userStateModel,
+ resourceMapping,
false
)
}
if(_rcEventsCounter > 0) {
- userState.msg.setLatestUpdateMillis(TimeHelpers.nowMillis())
+ userStateMsg.setLatestUpdateMillis(TimeHelpers.nowMillis())
}
Debug(logger, "Found %s resource events for month %s",
workingUserState.totalCredits = specialWorkingUserState.totalCredits
}*/
- userState
+ userStateMsg
}
}