From b3e27c05983aeb76c95cf1c13bfdc371d1760c49 Mon Sep 17 00:00:00 2001 From: Christos KK Loverdos Date: Mon, 7 May 2012 15:20:37 +0300 Subject: [PATCH] WIP: ResourceEvent-related refactorings --- .../actor/service/router/RouterActor.scala | 40 +++- .../aquarium/actor/service/user/UserActor.scala | 13 +- .../aquarium/logic/accounting/Accounting.scala | 100 +++----- .../algorithm/CostPolicyAlgorithmCompiler.scala | 2 +- .../algorithm/ExecutableCostPolicyAlgorithm.scala | 2 +- .../SimpleCostPolicyAlgorithmCompiler.scala | 4 +- .../SimpleExecutableCostPolicyAlgorithm.scala | 2 +- .../gr/grnet/aquarium/store/PolicyStore.scala | 9 +- .../grnet/aquarium/store/ResourceEventStore.scala | 2 +- .../gr/grnet/aquarium/store/UserStateStore.scala | 4 - .../gr/grnet/aquarium/store/memory/MemStore.scala | 2 +- .../aquarium/store/mongodb/MongoDBStore.scala | 8 +- .../gr/grnet/aquarium/user/UserDataSnapshot.scala | 8 +- .../aquarium/user/UserStateComputations.scala | 238 ++++++++------------ .../scala/gr/grnet/aquarium/util/package.scala | 10 +- .../aquarium/user/UserStateComputationsTest.scala | 21 +- 16 files changed, 189 insertions(+), 276 deletions(-) diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala index c5178a2..7677b23 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala @@ -45,6 +45,7 @@ import message.config.{AquariumPropertiesLoaded, ActorProviderConfigured} import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent} import gr.grnet.aquarium.actor.message.admin.PingAllRequest import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest} +import gr.grnet.aquarium.{AquariumException, AquariumInternalError} /** * Business logic router. Incoming messages are routed to appropriate destinations. Replies are routed back @@ -70,19 +71,46 @@ class RouterActor extends ReflectiveRoleableActor { UserActorCache.get(userID) match { case Some(userActorRef) ⇒ userActorRef + case None ⇒ _launchUserActor(userID) } } private[this] def _forwardToUserActor(userID: String, m: UserActorRequestMessage): Unit = { - try { - _findOrCreateUserActor(userID) forward m + _findOrCreateUserActor(userID) forward m + } + + + /** + * Handles an exception that occurred while servicing a message. + * + * @param t + * The exception. + * @param servicingMessage + * The message that was being served while the exception happened. + * Note that the message can be `null`, in which case the exception + * is an NPE. + */ + override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) = { + logChainOfCauses(t) + + def logIgnore(e: Throwable) = { + logger.error("Ignoring %s".format(shortClassNameOf(e)), e) + } + + t match { + case e: Error ⇒ + throw e + + case e: AquariumInternalError ⇒ + logIgnore(e) + + case e: AquariumException ⇒ + logIgnore(e) - } catch { case t: Throwable ⇒ - logger.error("While forwarding to user actor for userID = %s".format(userID), t) - // FIXME: We have a message that never gets to the user actor. - // FIXME: We should probably shut the user actor down. + case e: Throwable ⇒ + logIgnore(e) } } diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index b5a22a0..5b73185 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -107,7 +107,9 @@ class UserActor extends ReflectiveRoleableActor { val now = TimeHelpers.nowMillis() val imEvent = event.imEvent - val isUpdate = if(_haveIMState) { + val hadIMState = _haveIMState + + if(hadIMState) { val newOccurredMillis = imEvent.occurredMillis val currentOccurredMillis = this._imState.imEvent.occurredMillis @@ -119,14 +121,10 @@ class UserActor extends ReflectiveRoleableActor { return } - - true - } else { - false } this._imState = IMStateSnapshot(imEvent, now) - DEBUG("%s %s", if(isUpdate) "Update" else "Set", shortClassNameOf(this._imState)) + DEBUG("%s %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState)) } def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = { @@ -142,6 +140,9 @@ class UserActor extends ReflectiveRoleableActor { } def onProcessResourceEvent(event: ProcessResourceEvent): Unit = { + val rcEvent = event.rcEvent + + logger.info("Got\n{}", rcEvent.toJsonString) } diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/Accounting.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/Accounting.scala index 6a10e71..b376bff 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/accounting/Accounting.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/accounting/Accounting.scala @@ -43,10 +43,10 @@ import java.util.Date import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just} import gr.grnet.aquarium.util.{ContextualLogger, CryptoUtils, Loggable} import gr.grnet.aquarium.store.PolicyStore -import gr.grnet.aquarium.AquariumException import gr.grnet.aquarium.event.{WalletEntry} import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc} import gr.grnet.aquarium.event.resource.ResourceEventModel +import gr.grnet.aquarium.{AquariumInternalError, AquariumException} /** * A timeslot together with the algorithm and unit price that apply for this particular timeslot. @@ -141,7 +141,7 @@ trait Accounting extends DSLUtils with Loggable { dslResource: DSLResource, policiesByTimeslot: Map[Timeslot, DSLPolicy], agreementNamesByTimeslot: Map[Timeslot, String], - clogOpt: Option[ContextualLogger] = None): Maybe[List[Chargeslot]] = Maybe { + clogOpt: Option[ContextualLogger] = None): List[Chargeslot] = { val clog = ContextualLogger.fromOther(clogOpt, logger, "computeInitialChargeslots()") // clog.begin() @@ -254,7 +254,7 @@ trait Accounting extends DSLUtils with Loggable { * Compute the charge slots generated by a particular resource event. * */ - def computeFullChargeslots(previousResourceEventM: Maybe[ResourceEventModel], + def computeFullChargeslots(previousResourceEventOpt: Option[ResourceEventModel], currentResourceEvent: ResourceEventModel, oldCredits: Double, oldTotalAmount: Double, @@ -264,7 +264,7 @@ trait Accounting extends DSLUtils with Loggable { agreementNamesByTimeslot: Map[Timeslot, String], algorithmCompiler: CostPolicyAlgorithmCompiler, policyStore: PolicyStore, - clogOpt: Option[ContextualLogger] = None): Maybe[(Timeslot, List[Chargeslot])] = Maybe { + clogOpt: Option[ContextualLogger] = None): (Timeslot, List[Chargeslot]) = { val clog = ContextualLogger.fromOther(clogOpt, logger, "computeFullChargeslots()") // clog.begin() @@ -277,9 +277,9 @@ trait Accounting extends DSLUtils with Loggable { val (referenceTimeslot, relevantPolicies, previousValue) = costPolicy.needsPreviousEventForCreditAndAmountCalculation match { // We need a previous event case true ⇒ - previousResourceEventM match { + previousResourceEventOpt match { // We have a previous event - case Just(previousResourceEvent) ⇒ + case Some(previousResourceEvent) ⇒ // clog.debug("Have previous event") // clog.debug("previousValue = %s", previousResourceEvent.value) @@ -294,16 +294,10 @@ trait Accounting extends DSLUtils with Loggable { (referenceTimeslot, relevantPolicies, previousResourceEvent.value) // We do not have a previous event - case NoVal ⇒ + case None ⇒ throw new AquariumException( "Unable to charge. No previous event given for %s". format(currentResourceEvent.toDebugString())) - - // We could not obtain a previous event - case failed @ Failed(e) ⇒ - throw new AquariumException( - "Unable to charge. Could not obtain previous event for %s". - format(currentResourceEvent.toDebugString()), e) } // We do not need a previous event @@ -318,23 +312,21 @@ trait Accounting extends DSLUtils with Loggable { // clog.debug("referenceTimeslot = %s".format(referenceTimeslot)) // clog.debug("Calling policyStore.loadValidPolicyEntryAt(%s)", new MutableDateCalc(occurredMillis)) - val relevantPolicyM = policyStore.loadValidPolicyAt(occurredMillis, dsl) + val relevantPolicyOpt = policyStore.loadValidPolicyAt(occurredMillis, dsl) // clog.debug(" ==> relevantPolicyM = %s", relevantPolicyM) - val relevantPolicies = relevantPolicyM match { - case Just(relevantPolicy) ⇒ + val relevantPolicies = relevantPolicyOpt match { + case Some(relevantPolicy) ⇒ Map(referenceTimeslot -> relevantPolicy) - case NoVal ⇒ - throw new AquariumException("No relevant policy found for %s".format(referenceTimeslot)) - case failed @ Failed(e) ⇒ - throw new AquariumException("No relevant policy found for %s".format(referenceTimeslot), e) + case None ⇒ + throw new AquariumInternalError("No relevant policy found for %s".format(referenceTimeslot)) } (referenceTimeslot, relevantPolicies, previousValue) } - val initialChargeslotsM = computeInitialChargeslots( + val initialChargeslots = computeInitialChargeslots( referenceTimeslot, dslResource, relevantPolicies, @@ -342,60 +334,28 @@ trait Accounting extends DSLUtils with Loggable { Some(clog) ) - val fullChargeslotsM = initialChargeslotsM.map { chargeslots ⇒ - chargeslots.map { - case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒ - val execAlgorithmM = algorithmCompiler.compile(algorithmDefinition) - execAlgorithmM match { - case NoVal ⇒ - throw new AquariumException("Could not compile algorithm %s".format(algorithmDefinition)) - - case failed @ Failed(e) ⇒ - failed.throwMe - - case Just(execAlgorithm) ⇒ - val valueMap = costPolicy.makeValueMap( - oldCredits, - oldTotalAmount, - newTotalAmount, - stopMillis - startMillis, - previousValue, - currentResourceEvent.value, - unitPrice - ) + val fullChargeslots = initialChargeslots.map { + case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒ + val execAlgorithm = algorithmCompiler.compile(algorithmDefinition) + val valueMap = costPolicy.makeValueMap( + oldCredits, + oldTotalAmount, + newTotalAmount, + stopMillis - startMillis, + previousValue, + currentResourceEvent.value, + unitPrice + ) // clog.debug("execAlgorithm = %s", execAlgorithm) - clog.debugMap("valueMap", valueMap, 1) - - // This is it - val creditsM = execAlgorithm.apply(valueMap) - - creditsM match { - case NoVal ⇒ - throw new AquariumException( - "Could not compute credits for resource %s during %s". - format(dslResource.name, Timeslot(new Date(startMillis), new Date(stopMillis)))) - - case failed @ Failed(e) ⇒ - failed.throwMe + clog.debugMap("valueMap", valueMap, 1) - case Just(credits) ⇒ - chargeslot.copy(computedCredits = Some(credits)) - } - } - } - } - - val result = fullChargeslotsM match { - case Just(fullChargeslots) ⇒ - referenceTimeslot -> fullChargeslots - case NoVal ⇒ - null - case failed @ Failed(e) ⇒ - failed.throwMe + // This is it + val credits = execAlgorithm.apply(valueMap) + chargeslot.copy(computedCredits = Some(credits)) } -// clog.end() + val result = referenceTimeslot -> fullChargeslots result } diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/CostPolicyAlgorithmCompiler.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/CostPolicyAlgorithmCompiler.scala index 3685335..85de5eb 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/CostPolicyAlgorithmCompiler.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/CostPolicyAlgorithmCompiler.scala @@ -51,5 +51,5 @@ trait CostPolicyAlgorithmCompiler { * @param definition the textual representation of the algorithm * @return the executable form of the algorithm */ - def compile(definition: String): Maybe[ExecutableCostPolicyAlgorithm] + def compile(definition: String): ExecutableCostPolicyAlgorithm } diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/ExecutableCostPolicyAlgorithm.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/ExecutableCostPolicyAlgorithm.scala index 49dbef2..520e61a 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/ExecutableCostPolicyAlgorithm.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/ExecutableCostPolicyAlgorithm.scala @@ -44,4 +44,4 @@ import gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicyVar * * @author Christos KK Loverdos */ -trait ExecutableCostPolicyAlgorithm extends (Map[DSLCostPolicyVar, Any] => Maybe[Double]) +trait ExecutableCostPolicyAlgorithm extends (Map[DSLCostPolicyVar, Any] ⇒ Double) diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/SimpleCostPolicyAlgorithmCompiler.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/SimpleCostPolicyAlgorithmCompiler.scala index 23e8c9f..1f4448a 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/SimpleCostPolicyAlgorithmCompiler.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/SimpleCostPolicyAlgorithmCompiler.scala @@ -51,7 +51,7 @@ object SimpleCostPolicyAlgorithmCompiler extends CostPolicyAlgorithmCompiler { * @param definition the textual representation of the algorithm * @return the executable form of the algorithm */ - def compile(definition: String): Maybe[ExecutableCostPolicyAlgorithm] = { - Just(SimpleExecutableCostPolicyAlgorithm) + def compile(definition: String): ExecutableCostPolicyAlgorithm = { + SimpleExecutableCostPolicyAlgorithm } } diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/SimpleExecutableCostPolicyAlgorithm.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/SimpleExecutableCostPolicyAlgorithm.scala index 953303a..6b82167 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/SimpleExecutableCostPolicyAlgorithm.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/SimpleExecutableCostPolicyAlgorithm.scala @@ -50,7 +50,7 @@ object SimpleExecutableCostPolicyAlgorithm extends ExecutableCostPolicyAlgorithm @inline private[this] def hrs(millis: Double) = millis / 1000 / 60 / 60 - def apply(vars: Map[DSLCostPolicyVar, Any]): Maybe[Double] = Maybe { + def apply(vars: Map[DSLCostPolicyVar, Any]): Double = { vars.apply(DSLCostPolicyNameVar) match { case DSLCostPolicyNames.continuous ⇒ val unitPrice = vars(DSLUnitPriceVar).asInstanceOf[Double] diff --git a/src/main/scala/gr/grnet/aquarium/store/PolicyStore.scala b/src/main/scala/gr/grnet/aquarium/store/PolicyStore.scala index ed80f65..a9a58c4 100644 --- a/src/main/scala/gr/grnet/aquarium/store/PolicyStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/PolicyStore.scala @@ -71,18 +71,13 @@ trait PolicyStore { yield (timeslot, dsl.parse(policyEntry.policyYAML)) } - def loadValidPolicyEntryAt(atMillis: Long): Maybe[PolicyEntry] = Maybe { + def loadValidPolicyEntryAt(atMillis: Long): Option[PolicyEntry] = { loadPolicyEntriesAfter(0L).find { policyEntry ⇒ policyEntry.fromToTimeslot.containsTimeInMillis(atMillis) - } match { - case Some(policyEntry) ⇒ - policyEntry - case None ⇒ - null // Do not worry, this will be transformed to a NoVal by the Maybe polymorphic constructor } } - def loadValidPolicyAt(atMillis: Long, dsl: DSL): Maybe[DSLPolicy] = { + def loadValidPolicyAt(atMillis: Long, dsl: DSL): Option[DSLPolicy] = { loadValidPolicyEntryAt(atMillis).map(policyEntry ⇒ dsl.parse(policyEntry.policyYAML)) } diff --git a/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala b/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala index f263dd9..4eb6016 100644 --- a/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala @@ -75,7 +75,7 @@ trait ResourceEventStore { /** * Count and return the number of "out of sync" events for a billing month. */ - def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] + def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long /** * Finds all relevant resource events for the billing period. diff --git a/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala b/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala index f008133..2760a9e 100644 --- a/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala @@ -53,10 +53,6 @@ trait UserStateStore { */ def insertUserState(userState: UserState): UserState - def insertUserState2(userState: UserState): Maybe[UserState] = { - Maybe { insertUserState(userState) } - } - /** * Find a state by user ID */ diff --git a/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala b/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala index 2300ee3..985a27e 100644 --- a/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala @@ -273,7 +273,7 @@ class MemStore extends UserStateStore }.toList } - def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = Maybe { + def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = { _resourceEvents.filter { case ev ⇒ // out of sync events are those that were received in the billing month but occurred in previous (or next?) // months diff --git a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala index 1b46a3b..a8671ec 100644 --- a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala @@ -184,11 +184,9 @@ class MongoDBStore( MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None) } - def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = { - Maybe { - // FIXME: Implement - 0L - } + def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = { + // FIXME: Implement + 0L } def findAllRelevantResourceEventsForBillingPeriod(userId: String, diff --git a/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala b/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala index 55753ab..bd2d5b9 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala @@ -292,11 +292,11 @@ case class LatestResourceEventsWorker(latestEventsMap: FullMutableResourceTypeMa latestEventsMap((resourceEvent.resource, resourceEvent.instanceID)) = resourceEvent } - def findResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = { - findFromMapAsMaybe(latestEventsMap, (resource, instanceId)) + def findResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = { + latestEventsMap.get((resource, instanceId)) } - def findAndRemoveResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = { + def findAndRemoveResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = { findAndRemoveFromMap(latestEventsMap, (resource, instanceId)) } @@ -360,7 +360,7 @@ case class ImplicitlyIssuedResourceEventsWorker(implicitlyIssuedEventsMap: FullM def toImmutableSnapshot(snapshotTime: Long) = ImplicitlyIssuedResourceEventsSnapshot(toList, snapshotTime) - def findAndRemoveResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = { + def findAndRemoveResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = { findAndRemoveFromMap(implicitlyIssuedEventsMap, (resource, instanceId)) } diff --git a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala index ec34ac6..c015377 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala @@ -37,17 +37,15 @@ package gr.grnet.aquarium.user import scala.collection.mutable -import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe} -import gr.grnet.aquarium.util.{ContextualLogger, Loggable, justForSure, failedForSure} +import gr.grnet.aquarium.util.{ContextualLogger, Loggable} import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc} import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap} import gr.grnet.aquarium.store.{StoreProvider, PolicyStore} import gr.grnet.aquarium.logic.accounting.Accounting import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler -import gr.grnet.aquarium.event.{NewWalletEntry} +import gr.grnet.aquarium.event.NewWalletEntry import gr.grnet.aquarium.event.resource.ResourceEventModel import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent} -import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion.User import gr.grnet.aquarium.{AquariumInternalError, AquariumException} /** @@ -141,7 +139,7 @@ class UserStateComputations extends Loggable { accounting: Accounting, algorithmCompiler: CostPolicyAlgorithmCompiler, calculationReason: UserStateChangeReason, - clogOpt: Option[ContextualLogger] = None): Maybe[UserState] = { + clogOpt: Option[ContextualLogger] = None): UserState = { val clog = ContextualLogger.fromOther( clogOpt, @@ -149,7 +147,7 @@ class UserStateComputations extends Loggable { "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo) clog.begin() - def doCompute: Maybe[UserState] = { + def doCompute: UserState = { doFullMonthlyBilling( userId, billingMonthInfo, @@ -176,84 +174,56 @@ class UserStateComputations extends Loggable { // NOTE: Reason here will be: InitialUserStateSetup$ val initialUserState0 = createInitialUserStateFrom(currentUserState) - val initialUserStateM = userStateStore.insertUserState2(initialUserState0) + val initialUserState1 = userStateStore.insertUserState(initialUserState0) - clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM)) + clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1)) clog.end() - initialUserStateM + initialUserState1 } else { // Ask DB cache for the latest known user state for this billing period - val latestUserStateM = Maybe { userStateStore.findLatestUserStateForEndOfBillingMonth( + val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth( userId, billingMonthInfo.year, - billingMonthInfo.month) match { + billingMonthInfo.month) - case Some(latestUserState) ⇒ - latestUserState + latestUserStateOpt match { case None ⇒ - null - }} - - latestUserStateM match { - case NoVal ⇒ // Not found, must compute clog.debug("No user state found from cache, will have to (re)compute") val result = doCompute clog.end() result - case failed @ Failed(e) ⇒ - clog.warn("Failure while quering cache for user state: %s", failed) - clog.end() - failed - - case Just(latestUserState) ⇒ + case Some(latestUserState) ⇒ // Found a "latest" user state but need to see if it is indeed the true and one latest. // For this reason, we must count the events again. val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter - val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod( + val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod( userId, billingMonthStartMillis, billingMonthStopMillis) - actualOOSEventsCounterM match { - case NoVal ⇒ - val errMsg = "No counter computed for out of sync events. Should at least be zero." - clog.warn(errMsg) - val result = Failed(new AquariumException(errMsg)) + val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter + counterDiff match { + // ZERO, we are OK! + case 0 ⇒ + // NOTE: Keep the caller's calculation reason + latestUserState.copyForChangeReason(calculationReason) + + // We had more, so must recompute + case n if n > 0 ⇒ + clog.debug( + "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n) + val result = doCompute clog.end() result - case failed @ Failed(_) ⇒ - clog.warn("Failure while querying for out of sync events: %s", failed) - clog.end() - failed - - case Just(actualOOSEventsCounter) ⇒ - val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter - counterDiff match { - // ZERO, we are OK! - case 0 ⇒ - // NOTE: Keep the caller's calculation reason - Just(latestUserState.copyForChangeReason(calculationReason)) - - // We had more, so must recompute - case n if n > 0 ⇒ - clog.debug( - "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n) - val result = doCompute - clog.end() - result - - // We had less???? - case n if n < 0 ⇒ - val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n) - clog.warn(errMsg) - val result = Failed(new AquariumException(errMsg)) - clog.end() - result - } + // We had less???? + case n if n < 0 ⇒ + val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n) + clog.warn(errMsg) + throw new AquariumException(errMsg) } } } @@ -307,15 +277,15 @@ class UserStateComputations extends Loggable { // The resource event is billable // Find the previous event. // This is (potentially) needed to calculate new credit amount and new resource instance amount - val previousResourceEventM = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId) - clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_))) + val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId) + clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_))) - val havePreviousResourceEvent = previousResourceEventM.isJust + val havePreviousResourceEvent = previousResourceEventOpt.isDefined val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation if(needPreviousResourceEvent && !havePreviousResourceEvent) { // This must be the first resource event of its kind, ever. // TODO: We should normally check the DB to verify the claim (?) - clog.info("Ignoring first event of its kind %s", currentResourceEventDebugInfo) + clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo) userStateWorker.updateIgnored(currentResourceEvent) } else { val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount @@ -332,8 +302,8 @@ class UserStateComputations extends Loggable { val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot // clog.debug("Computing full chargeslots") - val fullChargeslotsM = accounting.computeFullChargeslots( - previousResourceEventM, + val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots( + previousResourceEventOpt, currentResourceEvent, oldCredits, oldAmount, @@ -347,56 +317,46 @@ class UserStateComputations extends Loggable { ) // We have the chargeslots, let's associate them with the current event - fullChargeslotsM match { - case Just((referenceTimeslot, fullChargeslots)) ⇒ - if(fullChargeslots.length == 0) { - // At least one chargeslot is required. - throw new AquariumException("No chargeslots computed for resource event %s".format(currentResourceEvent.id)) - } - clog.debugSeq("fullChargeslots", fullChargeslots, 0) - - // C. Compute new credit amount (based on the charge slots) - val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum - val newCredits = oldCredits - newCreditsDiff - - if(stateChangeReason.shouldStoreCalculatedWalletEntries) { - val newWalletEntry = NewWalletEntry( - userStateWorker.userId, - newCreditsDiff, - oldCredits, - newCredits, - TimeHelpers.nowMillis(), - referenceTimeslot, - billingMonthInfo.year, - billingMonthInfo.month, - if(havePreviousResourceEvent) - List(currentResourceEvent, justForSure(previousResourceEventM).get) - else - List(currentResourceEvent), - fullChargeslots, - dslResource, - currentResourceEvent.isSynthetic - ) - clog.debug("New %s", newWalletEntry) - - walletEntriesBuffer += newWalletEntry - } else { - clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits) - } - - _workingUserState = _workingUserState.copy( - creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()), - stateChangeCounter = _workingUserState.stateChangeCounter + 1, - totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1 - ) - - case NoVal ⇒ - // At least one chargeslot is required. - throw new AquariumException("No chargeslots computed") - - case failed@Failed(e) ⇒ - throw new AquariumException(e, "Error computing chargeslots") + if(fullChargeslots.length == 0) { + // At least one chargeslot is required. + throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id)) + } + clog.debugSeq("fullChargeslots", fullChargeslots, 0) + + // C. Compute new credit amount (based on the charge slots) + val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum + val newCredits = oldCredits - newCreditsDiff + + if(stateChangeReason.shouldStoreCalculatedWalletEntries) { + val newWalletEntry = NewWalletEntry( + userStateWorker.userId, + newCreditsDiff, + oldCredits, + newCredits, + TimeHelpers.nowMillis(), + referenceTimeslot, + billingMonthInfo.year, + billingMonthInfo.month, + if(havePreviousResourceEvent) + List(currentResourceEvent, previousResourceEventOpt.get) + else + List(currentResourceEvent), + fullChargeslots, + dslResource, + currentResourceEvent.isSynthetic + ) + clog.debug("New %s", newWalletEntry) + + walletEntriesBuffer += newWalletEntry + } else { + clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits) } + + _workingUserState = _workingUserState.copy( + creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()), + stateChangeCounter = _workingUserState.stateChangeCounter + 1, + totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1 + ) } } @@ -430,7 +390,7 @@ class UserStateComputations extends Loggable { var _workingUserState = startingUserState - for(currentResourceEvent <- resourceEvents) { + for(currentResourceEvent ← resourceEvents) { _workingUserState = processResourceEvent( _workingUserState, @@ -457,7 +417,7 @@ class UserStateComputations extends Loggable { accounting: Accounting, algorithmCompiler: CostPolicyAlgorithmCompiler, calculationReason: UserStateChangeReason = NoSpecificChangeReason, - clogOpt: Option[ContextualLogger] = None): Maybe[UserState] = Maybe { + clogOpt: Option[ContextualLogger] = None): UserState = { val clog = ContextualLogger.fromOther( @@ -468,7 +428,7 @@ class UserStateComputations extends Loggable { val clogSome = Some(clog) - val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth( + val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth( userId, billingMonthInfo.previousMonth, storeProvider, @@ -480,14 +440,7 @@ class UserStateComputations extends Loggable { clogSome ) - if(previousBillingMonthUserStateM.isNoVal) { - throw new AquariumException("Could not calculate initial user state for billing %s".format(billingMonthInfo)) - } - if(previousBillingMonthUserStateM.isFailed) { - throw failedForSure(previousBillingMonthUserStateM).exception - } - - val startingUserState = justForSure(previousBillingMonthUserStateM).get + val startingUserState = previousBillingMonthUserState val userStateStore = storeProvider.userStateStore val resourceEventStore = storeProvider.resourceEventStore @@ -572,16 +525,9 @@ class UserStateComputations extends Loggable { clog.debug("calculationReason = %s", calculationReason) if(calculationReason.shouldStoreUserState) { - val storedUserStateM = userStateStore.insertUserState2(_workingUserState) - storedUserStateM match { - case Just(storedUserState) ⇒ - clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState) - _workingUserState = storedUserState - case NoVal ⇒ - clog.warn("Could not store %s", _workingUserState) - case failed @ Failed(e) ⇒ - clog.error(e, "Could not store %s", _workingUserState) - } + val storedUserState = userStateStore.insertUserState(_workingUserState) + clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState) + _workingUserState = storedUserState } clog.debug("RETURN %s", _workingUserState) @@ -625,21 +571,19 @@ case class UserStateWorker(userId: String, * @param instanceId * @return */ - def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = { + def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = { // implicitly issued events are checked first implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match { - case just @ Just(_) ⇒ - just - case NoVal ⇒ + case some @ Some(_) ⇒ + some + case None ⇒ // explicit previous resource events are checked second previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match { - case just @ Just(_) ⇒ - just - case noValOrFailed ⇒ - noValOrFailed + case some @ Some(_) ⇒ + some + case _ ⇒ + None } - case failed ⇒ - failed } } @@ -691,8 +635,8 @@ case class UserStateWorker(userId: String, def doItFor(map: ResourceEventModel.FullMutableResourceTypeMap): Unit = { val resourceEvents = map.valuesIterator for { - resourceEvent <- resourceEvents - dslResource <- resourcesMap.findResource(resourceEvent.safeResource) + resourceEvent ← resourceEvents + dslResource ← resourcesMap.findResource(resourceEvent.safeResource) costPolicy = dslResource.costPolicy } { if(costPolicy.supportsImplicitEvents) { diff --git a/src/main/scala/gr/grnet/aquarium/util/package.scala b/src/main/scala/gr/grnet/aquarium/util/package.scala index cd6b2a4..e4e94b0 100644 --- a/src/main/scala/gr/grnet/aquarium/util/package.scala +++ b/src/main/scala/gr/grnet/aquarium/util/package.scala @@ -146,14 +146,8 @@ package object util { } @inline - def findAndRemoveFromMap[A, B <: AnyRef](map: scala.collection.mutable.Map[A, B], key: A): Maybe[B] = Maybe { - map.get(key) match { - case Some(value) ⇒ - map -= key - value - case None ⇒ - null.asInstanceOf[B] - } + def findAndRemoveFromMap[A, B <: AnyRef](map: scala.collection.mutable.Map[A, B], key: A): Option[B] = { + map.remove(key) } // Dear scalac. Optimize this. diff --git a/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala b/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala index 7335f2f..615424f 100644 --- a/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala +++ b/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala @@ -149,7 +149,7 @@ aquariumpolicy: @inline private[this] def hrs(millis: Double) = millis / 1000 / 60 / 60 - def apply(vars: Map[DSLCostPolicyVar, Any]): Maybe[Double] = Maybe { + def apply(vars: Map[DSLCostPolicyVar, Any]): Double = { vars.apply(DSLCostPolicyNameVar) match { case DSLCostPolicyNames.continuous ⇒ val unitPrice = vars(DSLUnitPriceVar).asInstanceOf[Double] @@ -194,8 +194,8 @@ aquariumpolicy: } val DefaultCompiler = new CostPolicyAlgorithmCompiler { - def compile(definition: String): Maybe[ExecutableCostPolicyAlgorithm] = { - Just(DefaultAlgorithm) + def compile(definition: String): ExecutableCostPolicyAlgorithm = { + DefaultAlgorithm } } //val DefaultAlgorithm = justForSure(DefaultCompiler.compile("")).get // hardcoded since we know exactly what this is @@ -349,9 +349,8 @@ aquariumpolicy: showResourceEvents(clog) - val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan) - val userState = justUserState(userStateM) - + val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan) + showUserState(clog, userState) expectCredits(clog, credits, userState) @@ -379,8 +378,7 @@ aquariumpolicy: showResourceEvents(clog) - val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan) - val userState = justUserState(userStateM) + val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan) showUserState(clog, userState) @@ -410,8 +408,7 @@ aquariumpolicy: showResourceEvents(clog) - val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan) - val userState = justUserState(userStateM) + val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan) showUserState(clog, userState) @@ -462,8 +459,8 @@ aquariumpolicy: clog.debugMap("DefaultResourcesMap", DefaultResourcesMap.map, 1) - val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan) - val userState = justUserState(userStateM) + val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan) + showUserState(clog, userState) clog.end() -- 1.7.10.4