package gr.grnet.aquarium.charging
-import gr.grnet.aquarium.event.model.resource.ResourceEventModel
import gr.grnet.aquarium.computation.BillingMonthInfo
-import gr.grnet.aquarium.charging.state.UserStateBootstrap
-import gr.grnet.aquarium.policy.ResourceType
-import gr.grnet.aquarium.util.{Lifecycle, Loggable, ContextualLogger}
+import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserStateMsg, ResourceEventMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, AvroHelpers}
+import gr.grnet.aquarium.util.LogHelpers.Debug
+import gr.grnet.aquarium.util.LogHelpers.DebugSeq
+import gr.grnet.aquarium.util.LogHelpers.Warn
import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
-import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
-import gr.grnet.aquarium.charging.state.{WorkingUserState, UserStateModel, StdUserState}
-import gr.grnet.aquarium.charging.reason.{MonthlyBillChargingReason, InitialUserStateSetup, ChargingReason}
+import gr.grnet.aquarium.util.{Lifecycle, Loggable}
+import gr.grnet.aquarium.{Real, AquariumInternalError, AquariumAwareSkeleton}
+import java.util.{Map ⇒ JMap}
+import gr.grnet.aquarium.charging.state.UserAgreementHistoryModel
/**
*
lazy val resourceEventStore = aquarium.resourceEventStore
//+ Lifecycle
- def start() = ()
+ def start() {}
- def stop() = ()
+ def stop() {}
//- Lifecycle
-
- //+ Utility methods
- protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
- rcEvent.toDebugString
+ def calculateRealtimeUserState(
+ userAgreementHistoryModel: UserAgreementHistoryModel,
+ userStateMsg: UserStateMsg,
+ resourceMapping: JMap[String, ResourceTypeMsg],
+ realtimeMillis: Long
+ ) {
+
+ import scala.collection.JavaConverters.mapAsScalaMapConverter
+
+ val stateOfResources = userStateMsg.getStateOfResources.asScala
+
+ for( (resourceName, workingResourcesState) ← stateOfResources) {
+ resourceMapping.get(resourceName) match {
+ case null ⇒
+ // Ignore
+
+ case resourceTypeMsg ⇒
+ val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
+ val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala
+
+ for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) {
+ Debug(logger, "Realtime calculation for %s, %s", resourceName, resourceInstanceID)
+ val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
+ userStateMsg.getUserID,
+ resourceName,
+ resourceInstanceID,
+ realtimeMillis,
+ resourceInstanceState
+ )
+ DebugSeq(logger, "virtualEvents", virtualEvents, 1)
+
+ processResourceEvents(
+ realtimeMillis,
+ virtualEvents,
+ userAgreementHistoryModel,
+ userStateMsg,
+ resourceMapping,
+ realtimeMillis
+ )
+ }
+ }
+ }
}
- //- Utility methods
def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+ processingTimeMillis: Long,
+ userAgreementHistoryModel: UserAgreementHistoryModel,
billingMonthInfo: BillingMonthInfo,
- userStateBootstrap: UserStateBootstrap,
- defaultResourceTypesMap: Map[String, ResourceType],
- chargingReason: ChargingReason,
- userStateRecorder: UserStateModel ⇒ UserStateModel,
- clogOpt: Option[ContextualLogger]
- ): WorkingUserState = {
-
- val clog = ContextualLogger.fromOther(
- clogOpt,
- logger,
- "findOrCalculateWorkingUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
- clog.begin()
-
- lazy val clogSome = Some(clog)
-
- def computeFullMonthBillingAndSaveState(): WorkingUserState = {
- val workingUserState = replayFullMonthBilling(
- userStateBootstrap,
+ resourceMapping: JMap[String, ResourceTypeMsg],
+ userStateRecorder: UserStateMsg ⇒ UserStateMsg
+ ): UserStateMsg = {
+
+ def computeFullMonthBillingAndSaveState(): UserStateMsg = {
+ val fullMonthUserState = replayFullMonthBilling(
+ processingTimeMillis,
+ userAgreementHistoryModel,
billingMonthInfo,
- defaultResourceTypesMap,
- chargingReason,
- userStateRecorder,
- clogSome
+ resourceMapping,
+ userStateRecorder
)
- val newChargingReason = MonthlyBillChargingReason(chargingReason, billingMonthInfo)
- workingUserState.chargingReason = newChargingReason
- val monthlyUserState0 = workingUserState.toUserState(
- true,
- billingMonthInfo.year,
- billingMonthInfo.month,
- None
- )
+ val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState).
+ setIsFullBillingMonth(true).
+ setBillingYear(billingMonthInfo.year).
+ setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay?
+ setOriginalID("").
+ build()
// We always save the state when it is a full month billing
val monthlyUserState1 = userStateRecorder.apply(monthlyUserState0)
- clog.debug("Stored full %s %s", billingMonthInfo.toDebugString, monthlyUserState1.toJsonString)
+ Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1))
- workingUserState
+ monthlyUserState1
}
- val userID = userStateBootstrap.userID
- val userCreationMillis = userStateBootstrap.userCreationMillis
+ val userID = userAgreementHistoryModel.userID
+ val userCreationMillis = userAgreementHistoryModel.unsafeUserCreationMillis
val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
val billingMonthStartMillis = billingMonthInfo.monthStartMillis
val billingMonthStopMillis = billingMonthInfo.monthStopMillis
if(billingMonthStopMillis < userCreationMillis) {
// If the user did not exist for this billing month, piece of cake
- clog.debug("User did not exist before %s", userCreationDateCalc)
+ Debug(logger, "User did not exist before %s", userCreationDateCalc)
// TODO: The initial user state might have already been created.
// First ask if it exists and compute only if not
- val initialUserState0 = StdUserState.createInitialUserStateFromBootstrap(
- userStateBootstrap,
- TimeHelpers.nowMillis(),
- InitialUserStateSetup(Some(chargingReason)) // we record the originating calculation reason
+ val initialUserState0 = MessageFactory.newInitialUserStateMsg(
+ userID,
+ Real.Zero,
+ TimeHelpers.nowMillis()
)
- logger.debug("Created (from bootstrap) initial user state %s".format(initialUserState0))
+ Debug(logger, "Created (from bootstrap) initial user state %s", initialUserState0)
// We always save the initial state
val initialUserState1 = userStateRecorder.apply(initialUserState0)
- clog.debug("Stored initial state = %s", initialUserState1.toJsonString)
- clog.end()
+ Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1))
- return initialUserState1.toWorkingUserState(defaultResourceTypesMap)
+ return initialUserState1
}
// Ask DB cache for the latest known user state for this billing period
latestUserStateOpt match {
case None ⇒
// Not found, must compute
- clog.debug("No user state found from cache, will have to (re)compute")
- val result = computeFullMonthBillingAndSaveState
- clog.end()
- result
+ Debug(logger, "No user state found from cache, will have to (re)compute")
+ computeFullMonthBillingAndSaveState
case Some(latestUserState) ⇒
// Found a "latest" user state but need to see if it is indeed the true and one latest.
// For this reason, we must count the events again.
- val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
+ val latestStateOOSEventsCounter = latestUserState.getBillingPeriodOutOfSyncResourceEventsCounter
val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
userID,
billingMonthStartMillis,
// ZERO, we are OK!
case 0 ⇒
// NOTE: Keep the caller's calculation reason
- val userStateModel = latestUserState.newWithChargingReason(chargingReason)
- clog.end()
- userStateModel.toWorkingUserState(defaultResourceTypesMap)
+ latestUserState
// We had more, so must recompute
case n if n > 0 ⇒
- clog.debug(
+ Debug(logger,
"Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
- val workingUserState = computeFullMonthBillingAndSaveState
- clog.end()
- workingUserState
+ computeFullMonthBillingAndSaveState
// We had less????
case n if n < 0 ⇒
val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
- clog.warn(errMsg)
+ Warn(logger, errMsg)
throw new AquariumInternalError(errMsg)
}
}
}
/**
- * Processes one resource event and computes relevant charges.
+ * Processes one resource event and computes relevant, incremental charges.
+ * If needed, it may go back in time and recompute stuff.
*
* @param resourceEvent
- * @param workingUserState
- * @param chargingReason
- * @param billingMonthInfo
- * @param clogOpt
+ * @param userStateMsg
*/
def processResourceEvent(
- resourceEvent: ResourceEventModel,
- workingUserState: WorkingUserState,
- chargingReason: ChargingReason,
- billingMonthInfo: BillingMonthInfo,
- clogOpt: Option[ContextualLogger]
- ): Unit = {
+ processingTimeMillis: Long,
+ resourceEvent: ResourceEventMsg,
+ userAgreementHistoryModel: UserAgreementHistoryModel,
+ userStateMsg: UserStateMsg,
+ resourceMapping: JMap[String, ResourceTypeMsg],
+ updateLatestMillis: Boolean
+ ) {
+ require(userStateMsg ne null, "userStateMsg ne null")
+
+ val resourceName = resourceEvent.getResource
+ val resourceTypeMsg = resourceMapping.get(resourceName)
+
+ val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
+ val resourcesChargingState = MessageHelpers.getOrInitializeResourcesChargingState(
+ userStateMsg,
+ resourceName,
+ chargingBehavior.initialChargingDetails
+ )
+
+ val eventOccurredMillis = resourceEvent.getOccurredMillis
+ val eventReceivedMillis = resourceEvent.getReceivedMillis
+ val billingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
+
+
+ // See if this is out-of-sync
- val resourceTypeName = resourceEvent.resource
- val resourceTypeOpt = workingUserState.findResourceType(resourceTypeName)
- if(resourceTypeOpt.isEmpty) {
- return
- }
- val resourceType = resourceTypeOpt.get
- val resourceAndInstanceInfo = resourceEvent.safeResourceInstanceInfo
- val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
- val (walletEntriesCount, newTotalCredits) = chargingBehavior.chargeResourceEvent(
+ val m0 = TimeHelpers.nowMillis()
+ val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
aquarium,
resourceEvent,
- resourceType,
+ resourceTypeMsg,
billingMonthInfo,
- workingUserState.workingAgreementHistory.toAgreementHistory,
- workingUserState.getChargingDataForResourceEvent(resourceAndInstanceInfo),
- workingUserState.totalCredits,
- workingUserState.walletEntries += _,
- clogOpt
+ resourcesChargingState,
+ userAgreementHistoryModel,
+ userStateMsg,
+ msg ⇒ userStateMsg.getWalletEntries.add(msg)
)
+ val m1 = TimeHelpers.nowMillis()
- workingUserState.totalCredits = newTotalCredits
+ if(updateLatestMillis) {
+ userStateMsg.setLatestUpdateMillis(m1)
+ }
+
+ MessageHelpers.updateLatestResourceEventOccurredMillis(userStateMsg, resourceEvent.getOccurredMillis)
+ MessageHelpers.subtractCredits(userStateMsg, creditsToSubtract)
+
+ true
}
def processResourceEvents(
- resourceEvents: Traversable[ResourceEventModel],
- workingUserState: WorkingUserState,
- chargingReason: ChargingReason,
- billingMonthInfo: BillingMonthInfo,
- clogOpt: Option[ContextualLogger] = None
+ processingTimeMillis: Long,
+ resourceEvents: Traversable[ResourceEventMsg],
+ userAgreementHistoryModel: UserAgreementHistoryModel,
+ userStateMsg: UserStateMsg,
+ resourceMapping: JMap[String, ResourceTypeMsg],
+ latestUpdateMillis: Long
): Unit = {
+ var _counter = 0
for(currentResourceEvent ← resourceEvents) {
processResourceEvent(
+ processingTimeMillis,
currentResourceEvent,
- workingUserState,
- chargingReason,
- billingMonthInfo,
- clogOpt
+ userAgreementHistoryModel,
+ userStateMsg,
+ resourceMapping,
+ false
)
+
+ _counter += 1
+ }
+
+ if(_counter > 0) {
+ userStateMsg.setLatestUpdateMillis(latestUpdateMillis)
}
}
def replayFullMonthBilling(
- userStateBootstrap: UserStateBootstrap,
+ processingTimeMillis: Long,
+ userAgreementHistoryModel: UserAgreementHistoryModel,
billingMonthInfo: BillingMonthInfo,
- defaultResourceTypesMap: Map[String, ResourceType],
- chargingReason: ChargingReason,
- userStateRecorder: UserStateModel ⇒ UserStateModel,
- clogOpt: Option[ContextualLogger]
- ): WorkingUserState = {
+ resourceMapping: JMap[String, ResourceTypeMsg],
+ userStateRecorder: UserStateMsg ⇒ UserStateMsg
+ ): UserStateMsg = {
replayMonthChargingUpTo(
+ processingTimeMillis,
+ userAgreementHistoryModel,
billingMonthInfo,
billingMonthInfo.monthStopMillis,
- userStateBootstrap,
- defaultResourceTypesMap,
- chargingReason,
- userStateRecorder,
- clogOpt
+ resourceMapping,
+ userStateRecorder
)
}
*
* @param billingMonthInfo Which month to bill.
* @param billingEndTimeMillis Bill from start of month up to (and including) this time.
- * @param userStateBootstrap
- * @param resourceTypesMap
- * @param chargingReason
* @param userStateRecorder
- * @param clogOpt
* @return
*/
def replayMonthChargingUpTo(
+ processingTimeMillis: Long,
+ userAgreementHistoryModel: UserAgreementHistoryModel,
billingMonthInfo: BillingMonthInfo,
billingEndTimeMillis: Long,
- userStateBootstrap: UserStateBootstrap,
- resourceTypesMap: Map[String, ResourceType],
- chargingReason: ChargingReason,
- userStateRecorder: UserStateModel ⇒ UserStateModel,
- clogOpt: Option[ContextualLogger]
- ): WorkingUserState = {
+ resourceMapping: JMap[String, ResourceTypeMsg],
+ userStateRecorder: UserStateMsg ⇒ UserStateMsg
+ ): UserStateMsg = {
val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
- val userID = userStateBootstrap.userID
-
- val clog = ContextualLogger.fromOther(
- clogOpt,
- logger,
- "replayMonthChargingUpTo(%s)", TimeHelpers.toYYYYMMDDHHMMSSSSS(billingEndTimeMillis))
- clog.begin()
-
- clog.debug("%s", chargingReason)
-
- val clogSome = Some(clog)
+ val userID = userAgreementHistoryModel.userID
// In order to replay the full month, we start with the state at the beginning of the month.
val previousBillingMonthInfo = billingMonthInfo.previousMonth
- val workingUserState = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+ val userStateMsg = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
+ processingTimeMillis,
+ userAgreementHistoryModel,
previousBillingMonthInfo,
- userStateBootstrap,
- resourceTypesMap,
- chargingReason,
- userStateRecorder,
- clogSome
+ resourceMapping,
+ userStateRecorder
)
// FIXME the below comments
// specified in the parameters.
// NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
- clog.debug("previousBillingMonthUserState(%s) = %s".format(
+ Debug(logger, "workingUserState=%s", userStateMsg)
+ Debug(logger, "previousBillingMonthUserState(%s) = %s",
previousBillingMonthInfo.toShortDebugString,
- workingUserState.toJsonString)
+ userStateMsg
)
var _rcEventsCounter = 0
billingEndTimeMillis // to requested time
) { currentResourceEvent ⇒
- clog.debug("Processing %s".format(currentResourceEvent))
+ Debug(logger, "Processing %s", currentResourceEvent)
processResourceEvent(
+ processingTimeMillis,
currentResourceEvent,
- workingUserState,
- chargingReason,
- billingMonthInfo,
- clogSome
+ userAgreementHistoryModel,
+ userStateMsg,
+ resourceMapping,
+ false
)
_rcEventsCounter += 1
}
- clog.debug("Found %s resource events for month %s".format(_rcEventsCounter, billingMonthInfo.toShortDebugString))
+ if(_rcEventsCounter > 0) {
+ userStateMsg.setLatestUpdateMillis(TimeHelpers.nowMillis())
+ }
- if(isFullMonthBilling) {
+ Debug(logger, "Found %s resource events for month %s",
+ _rcEventsCounter,
+ billingMonthInfo.toShortDebugString
+ )
+
+ // FIXME Reuse the logic here...Do not erase the comment...
+ /*if(isFullMonthBilling) {
// For the remaining events which must contribute an implicit OFF, we collect those OFFs
// ... in order to generate an implicit ON later (during the next billing cycle).
val (generatorsOfImplicitEnds, theirImplicitEnds) = workingUserState.findAndRemoveGeneratorsOfImplicitEndEvents(
)
if(generatorsOfImplicitEnds.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
- clog.debug("")
- clog.debug("Process implicitly issued events")
- clog.debugSeq("generatorsOfImplicitEnds", generatorsOfImplicitEnds, 0)
- clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
+ Debug(logger, "")
+ Debug(logger, "Process implicitly issued events")
+ DebugSeq(logger, "generatorsOfImplicitEnds", generatorsOfImplicitEnds, 0)
+ DebugSeq(logger, "theirImplicitEnds", theirImplicitEnds, 0)
}
// Now, the previous and implicitly started must be our base for the following computation, so we create an
theirImplicitEnds,
specialWorkingUserState,
chargingReason,
- billingMonthInfo,
- clogSome
+ billingMonthInfo
)
workingUserState.walletEntries ++= specialWorkingUserState.walletEntries
workingUserState.totalCredits = specialWorkingUserState.totalCredits
- }
+ }*/
- clog.end()
- workingUserState
+ userStateMsg
}
}