From: Christos KK Loverdos Date: Mon, 19 Mar 2012 14:02:57 +0000 (+0200) Subject: Introduce user state worker in order to modularize user state computations X-Git-Url: https://code.grnet.gr/git/aquarium/commitdiff_plain/b3cec08741eb883ebe33489d2f4a5cbe374d14ec Introduce user state worker in order to modularize user state computations --- diff --git a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala index 412b0b4..2d18fe1 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala @@ -262,62 +262,9 @@ class UserStateComputations extends Loggable { // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies var _workingUserState = startingUserState.copyForChangeReason(calculationReason) - // This is a collection of all the latest resource events. - // We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time) - // ones. - // Will be updated on processing the next resource event. - val previousResourceEvents = startingUserState.latestResourceEventsSnapshot.toMutableWorker - // Prepare the implicitly issued resource events from previous billing period - val implicitlyIssuedResourceEvents = _workingUserState.implicitlyIssuedSnapshot.toMutableWorker - // Keep the resource events from this period that were first (and unused) of their kind - val ignoredFirstResourceEvents = IgnoredFirstResourceEventsWorker.Empty - - /** - * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource - * events and b) the explicit previous resource events. If the event is found, it is removed from the - * respective source. - * - * If the event is not found, then this must be for a new resource instance. - * (and probably then some `zero` resource event must be implied as the previous one) - * - * @param resource - * @param instanceId - * @return - */ - def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = { - // implicitly terminated events are checked first - implicitlyIssuedResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match { - case just @ Just(_) ⇒ - just - case NoVal ⇒ - // explicit previous resource events are checked second - previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match { - case just @ Just(_) ⇒ - just - case noValOrFailed ⇒ - noValOrFailed - } - case failed ⇒ - failed - } - } - - def debugTheMaps(): Unit = { - if(previousResourceEvents.size > 0) { - val map = previousResourceEvents.resourceEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) } - clog.debugMap("previousResourceEvents", map, 0) - } - if(implicitlyIssuedResourceEvents.size > 0) { - val map = implicitlyIssuedResourceEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) } - clog.debugMap("implicitlyTerminatedResourceEvents", map, 0) - } - if(ignoredFirstResourceEvents.size > 0) { - val map = ignoredFirstResourceEvents.ignoredFirstEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) } - clog.debugMap("ignoredFirstResourceEvents", map, 0) - } - } + val userStateWorker = UserStateWorker.fromUserState(startingUserState) - debugTheMaps() + userStateWorker.debugTheMaps(clog)(rcDebugInfo) // Find the actual resource events from DB val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod( @@ -359,7 +306,7 @@ class UserStateComputations extends Loggable { val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent) clog.begin(currentResourceEventDebugInfo) - debugTheMaps() + userStateWorker.debugTheMaps(clog)(rcDebugInfo) // Ignore the event if it is not billable (but still record it in the "previous" stuff). // But to make this decision, first we need the resource definition (and its cost policy). @@ -378,7 +325,7 @@ class UserStateComputations extends Loggable { // The resource event is billable // Find the previous event. // This is (potentially) needed to calculate new credit amount and new resource instance amount - val previousResourceEventM = findAndRemovePreviousResourceEvent(theResource, theInstanceId) + val previousResourceEventM = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId) clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_))) val havePreviousResourceEvent = previousResourceEventM.isJust @@ -387,7 +334,7 @@ class UserStateComputations extends Loggable { // This must be the first resource event of its kind, ever. // TODO: We should normally check the DB to verify the claim (?) clog.info("Ignoring first event of its kind %s", currentResourceEventDebugInfo) - ignoredFirstResourceEvents.updateResourceEvent(currentResourceEvent) + userStateWorker.updateIgnored(currentResourceEvent) } else { val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount) @@ -471,7 +418,7 @@ class UserStateComputations extends Loggable { } // After processing, all event, billable or not update the previous state - previousResourceEvents.updateResourceEvent(currentResourceEvent) + userStateWorker.updatePrevious(currentResourceEvent) // We do not have a resource (and no cost policy) case None ⇒ @@ -485,8 +432,8 @@ class UserStateComputations extends Loggable { val lastUpdateTime = TimeHelpers.nowMillis _workingUserState = _workingUserState.copy( - implicitlyIssuedSnapshot = implicitlyIssuedResourceEvents.toImmutableSnapshot(lastUpdateTime), - latestResourceEventsSnapshot = previousResourceEvents.toImmutableSnapshot(lastUpdateTime), + implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedResourceEvents.toImmutableSnapshot(lastUpdateTime), + latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime), stateChangeCounter = _workingUserState.stateChangeCounter + 1, parentUserStateId = startingUserState.idOpt, newWalletEntries = calculatedNewWalletEntries @@ -512,3 +459,88 @@ class UserStateComputations extends Loggable { _workingUserState } } + +/** + * A helper object holding intermediate state/results during resource event processing. + * + * @param previousResourceEvents + * This is a collection of all the latest resource events. + * We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time) + * ones. Will be updated on processing the next resource event. + * + * @param implicitlyIssuedResourceEvents + * The implicitly issued resource events (from previous billing period). + * + * @param ignoredFirstResourceEvents + * The resource events that were first (and unused) of their kind. + * + * @author Christos KK Loverdos + */ +case class UserStateWorker(previousResourceEvents: LatestResourceEventsWorker, + implicitlyIssuedResourceEvents: ImplicitlyIssuedResourceEventsWorker, + ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker) { + + /** + * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource + * events and b) the explicit previous resource events. If the event is found, it is removed from the + * respective source. + * + * If the event is not found, then this must be for a new resource instance. + * (and probably then some `zero` resource event must be implied as the previous one) + * + * @param resource + * @param instanceId + * @return + */ + def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = { + // implicitly issued events are checked first + implicitlyIssuedResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match { + case just @ Just(_) ⇒ + just + case NoVal ⇒ + // explicit previous resource events are checked second + previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match { + case just @ Just(_) ⇒ + just + case noValOrFailed ⇒ + noValOrFailed + } + case failed ⇒ + failed + } + } + + def updateIgnored(resourceEvent: ResourceEvent): Unit = { + ignoredFirstResourceEvents.updateResourceEvent(resourceEvent) + } + + def updatePrevious(resourceEvent: ResourceEvent): Unit = { + previousResourceEvents.updateResourceEvent(resourceEvent) + } + + def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEvent ⇒ String): Unit = { + if(previousResourceEvents.size > 0) { + val map = previousResourceEvents.resourceEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) } + clog.debugMap("previousResourceEvents", map, 0) + } + if(implicitlyIssuedResourceEvents.size > 0) { + val map = implicitlyIssuedResourceEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) } + clog.debugMap("implicitlyTerminatedResourceEvents", map, 0) + } + if(ignoredFirstResourceEvents.size > 0) { + val map = ignoredFirstResourceEvents.ignoredFirstEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) } + clog.debugMap("ignoredFirstResourceEvents", map, 0) + } + } + +} + +object UserStateWorker { + def fromUserState(userState: UserState): UserStateWorker = { + UserStateWorker( + userState.latestResourceEventsSnapshot.toMutableWorker, + userState.implicitlyIssuedSnapshot.toMutableWorker, + IgnoredFirstResourceEventsWorker.Empty + ) + } +}