import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
import gr.grnet.aquarium.actor.message.admin.PingAllRequest
import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest}
+import gr.grnet.aquarium.{AquariumException, AquariumInternalError}
/**
* Business logic router. Incoming messages are routed to appropriate destinations. Replies are routed back
UserActorCache.get(userID) match {
case Some(userActorRef) ⇒
userActorRef
+
case None ⇒
_launchUserActor(userID)
}
}
private[this] def _forwardToUserActor(userID: String, m: UserActorRequestMessage): Unit = {
- try {
- _findOrCreateUserActor(userID) forward m
+ _findOrCreateUserActor(userID) forward m
+ }
+
+
+ /**
+ * Handles an exception that occurred while servicing a message.
+ *
+ * @param t
+ * The exception.
+ * @param servicingMessage
+ * The message that was being served while the exception happened.
+ * Note that the message can be `null`, in which case the exception
+ * is an NPE.
+ */
+ override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) = {
+ logChainOfCauses(t)
+
+ def logIgnore(e: Throwable) = {
+ logger.error("Ignoring %s".format(shortClassNameOf(e)), e)
+ }
+
+ t match {
+ case e: Error ⇒
+ throw e
+
+ case e: AquariumInternalError ⇒
+ logIgnore(e)
+
+ case e: AquariumException ⇒
+ logIgnore(e)
- } catch { case t: Throwable ⇒
- logger.error("While forwarding to user actor for userID = %s".format(userID), t)
- // FIXME: We have a message that never gets to the user actor.
- // FIXME: We should probably shut the user actor down.
+ case e: Throwable ⇒
+ logIgnore(e)
}
}
val now = TimeHelpers.nowMillis()
val imEvent = event.imEvent
- val isUpdate = if(_haveIMState) {
+ val hadIMState = _haveIMState
+
+ if(hadIMState) {
val newOccurredMillis = imEvent.occurredMillis
val currentOccurredMillis = this._imState.imEvent.occurredMillis
return
}
-
- true
- } else {
- false
}
this._imState = IMStateSnapshot(imEvent, now)
- DEBUG("%s %s", if(isUpdate) "Update" else "Set", shortClassNameOf(this._imState))
+ DEBUG("%s %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState))
}
def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
}
def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
+ val rcEvent = event.rcEvent
+
+ logger.info("Got\n{}", rcEvent.toJsonString)
}
import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just}
import gr.grnet.aquarium.util.{ContextualLogger, CryptoUtils, Loggable}
import gr.grnet.aquarium.store.PolicyStore
-import gr.grnet.aquarium.AquariumException
import gr.grnet.aquarium.event.{WalletEntry}
import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
import gr.grnet.aquarium.event.resource.ResourceEventModel
+import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
/**
* A timeslot together with the algorithm and unit price that apply for this particular timeslot.
dslResource: DSLResource,
policiesByTimeslot: Map[Timeslot, DSLPolicy],
agreementNamesByTimeslot: Map[Timeslot, String],
- clogOpt: Option[ContextualLogger] = None): Maybe[List[Chargeslot]] = Maybe {
+ clogOpt: Option[ContextualLogger] = None): List[Chargeslot] = {
val clog = ContextualLogger.fromOther(clogOpt, logger, "computeInitialChargeslots()")
// clog.begin()
* Compute the charge slots generated by a particular resource event.
*
*/
- def computeFullChargeslots(previousResourceEventM: Maybe[ResourceEventModel],
+ def computeFullChargeslots(previousResourceEventOpt: Option[ResourceEventModel],
currentResourceEvent: ResourceEventModel,
oldCredits: Double,
oldTotalAmount: Double,
agreementNamesByTimeslot: Map[Timeslot, String],
algorithmCompiler: CostPolicyAlgorithmCompiler,
policyStore: PolicyStore,
- clogOpt: Option[ContextualLogger] = None): Maybe[(Timeslot, List[Chargeslot])] = Maybe {
+ clogOpt: Option[ContextualLogger] = None): (Timeslot, List[Chargeslot]) = {
val clog = ContextualLogger.fromOther(clogOpt, logger, "computeFullChargeslots()")
// clog.begin()
val (referenceTimeslot, relevantPolicies, previousValue) = costPolicy.needsPreviousEventForCreditAndAmountCalculation match {
// We need a previous event
case true ⇒
- previousResourceEventM match {
+ previousResourceEventOpt match {
// We have a previous event
- case Just(previousResourceEvent) ⇒
+ case Some(previousResourceEvent) ⇒
// clog.debug("Have previous event")
// clog.debug("previousValue = %s", previousResourceEvent.value)
(referenceTimeslot, relevantPolicies, previousResourceEvent.value)
// We do not have a previous event
- case NoVal ⇒
+ case None ⇒
throw new AquariumException(
"Unable to charge. No previous event given for %s".
format(currentResourceEvent.toDebugString()))
-
- // We could not obtain a previous event
- case failed @ Failed(e) ⇒
- throw new AquariumException(
- "Unable to charge. Could not obtain previous event for %s".
- format(currentResourceEvent.toDebugString()), e)
}
// We do not need a previous event
// clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
// clog.debug("Calling policyStore.loadValidPolicyEntryAt(%s)", new MutableDateCalc(occurredMillis))
- val relevantPolicyM = policyStore.loadValidPolicyAt(occurredMillis, dsl)
+ val relevantPolicyOpt = policyStore.loadValidPolicyAt(occurredMillis, dsl)
// clog.debug(" ==> relevantPolicyM = %s", relevantPolicyM)
- val relevantPolicies = relevantPolicyM match {
- case Just(relevantPolicy) ⇒
+ val relevantPolicies = relevantPolicyOpt match {
+ case Some(relevantPolicy) ⇒
Map(referenceTimeslot -> relevantPolicy)
- case NoVal ⇒
- throw new AquariumException("No relevant policy found for %s".format(referenceTimeslot))
- case failed @ Failed(e) ⇒
- throw new AquariumException("No relevant policy found for %s".format(referenceTimeslot), e)
+ case None ⇒
+ throw new AquariumInternalError("No relevant policy found for %s".format(referenceTimeslot))
}
(referenceTimeslot, relevantPolicies, previousValue)
}
- val initialChargeslotsM = computeInitialChargeslots(
+ val initialChargeslots = computeInitialChargeslots(
referenceTimeslot,
dslResource,
relevantPolicies,
Some(clog)
)
- val fullChargeslotsM = initialChargeslotsM.map { chargeslots ⇒
- chargeslots.map {
- case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒
- val execAlgorithmM = algorithmCompiler.compile(algorithmDefinition)
- execAlgorithmM match {
- case NoVal ⇒
- throw new AquariumException("Could not compile algorithm %s".format(algorithmDefinition))
-
- case failed @ Failed(e) ⇒
- failed.throwMe
-
- case Just(execAlgorithm) ⇒
- val valueMap = costPolicy.makeValueMap(
- oldCredits,
- oldTotalAmount,
- newTotalAmount,
- stopMillis - startMillis,
- previousValue,
- currentResourceEvent.value,
- unitPrice
- )
+ val fullChargeslots = initialChargeslots.map {
+ case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒
+ val execAlgorithm = algorithmCompiler.compile(algorithmDefinition)
+ val valueMap = costPolicy.makeValueMap(
+ oldCredits,
+ oldTotalAmount,
+ newTotalAmount,
+ stopMillis - startMillis,
+ previousValue,
+ currentResourceEvent.value,
+ unitPrice
+ )
// clog.debug("execAlgorithm = %s", execAlgorithm)
- clog.debugMap("valueMap", valueMap, 1)
-
- // This is it
- val creditsM = execAlgorithm.apply(valueMap)
-
- creditsM match {
- case NoVal ⇒
- throw new AquariumException(
- "Could not compute credits for resource %s during %s".
- format(dslResource.name, Timeslot(new Date(startMillis), new Date(stopMillis))))
-
- case failed @ Failed(e) ⇒
- failed.throwMe
+ clog.debugMap("valueMap", valueMap, 1)
- case Just(credits) ⇒
- chargeslot.copy(computedCredits = Some(credits))
- }
- }
- }
- }
-
- val result = fullChargeslotsM match {
- case Just(fullChargeslots) ⇒
- referenceTimeslot -> fullChargeslots
- case NoVal ⇒
- null
- case failed @ Failed(e) ⇒
- failed.throwMe
+ // This is it
+ val credits = execAlgorithm.apply(valueMap)
+ chargeslot.copy(computedCredits = Some(credits))
}
-// clog.end()
+ val result = referenceTimeslot -> fullChargeslots
result
}
* @param definition the textual representation of the algorithm
* @return the executable form of the algorithm
*/
- def compile(definition: String): Maybe[ExecutableCostPolicyAlgorithm]
+ def compile(definition: String): ExecutableCostPolicyAlgorithm
}
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-trait ExecutableCostPolicyAlgorithm extends (Map[DSLCostPolicyVar, Any] => Maybe[Double])
+trait ExecutableCostPolicyAlgorithm extends (Map[DSLCostPolicyVar, Any] ⇒ Double)
* @param definition the textual representation of the algorithm
* @return the executable form of the algorithm
*/
- def compile(definition: String): Maybe[ExecutableCostPolicyAlgorithm] = {
- Just(SimpleExecutableCostPolicyAlgorithm)
+ def compile(definition: String): ExecutableCostPolicyAlgorithm = {
+ SimpleExecutableCostPolicyAlgorithm
}
}
@inline private[this]
def hrs(millis: Double) = millis / 1000 / 60 / 60
- def apply(vars: Map[DSLCostPolicyVar, Any]): Maybe[Double] = Maybe {
+ def apply(vars: Map[DSLCostPolicyVar, Any]): Double = {
vars.apply(DSLCostPolicyNameVar) match {
case DSLCostPolicyNames.continuous ⇒
val unitPrice = vars(DSLUnitPriceVar).asInstanceOf[Double]
yield (timeslot, dsl.parse(policyEntry.policyYAML))
}
- def loadValidPolicyEntryAt(atMillis: Long): Maybe[PolicyEntry] = Maybe {
+ def loadValidPolicyEntryAt(atMillis: Long): Option[PolicyEntry] = {
loadPolicyEntriesAfter(0L).find { policyEntry ⇒
policyEntry.fromToTimeslot.containsTimeInMillis(atMillis)
- } match {
- case Some(policyEntry) ⇒
- policyEntry
- case None ⇒
- null // Do not worry, this will be transformed to a NoVal by the Maybe polymorphic constructor
}
}
- def loadValidPolicyAt(atMillis: Long, dsl: DSL): Maybe[DSLPolicy] = {
+ def loadValidPolicyAt(atMillis: Long, dsl: DSL): Option[DSLPolicy] = {
loadValidPolicyEntryAt(atMillis).map(policyEntry ⇒ dsl.parse(policyEntry.policyYAML))
}
/**
* Count and return the number of "out of sync" events for a billing month.
*/
- def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long]
+ def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long
/**
* Finds all relevant resource events for the billing period.
*/
def insertUserState(userState: UserState): UserState
- def insertUserState2(userState: UserState): Maybe[UserState] = {
- Maybe { insertUserState(userState) }
- }
-
/**
* Find a state by user ID
*/
}.toList
}
- def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = Maybe {
+ def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
_resourceEvents.filter { case ev ⇒
// out of sync events are those that were received in the billing month but occurred in previous (or next?)
// months
MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None)
}
- def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = {
- Maybe {
- // FIXME: Implement
- 0L
- }
+ def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
+ // FIXME: Implement
+ 0L
}
def findAllRelevantResourceEventsForBillingPeriod(userId: String,
latestEventsMap((resourceEvent.resource, resourceEvent.instanceID)) = resourceEvent
}
- def findResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = {
- findFromMapAsMaybe(latestEventsMap, (resource, instanceId))
+ def findResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
+ latestEventsMap.get((resource, instanceId))
}
- def findAndRemoveResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = {
+ def findAndRemoveResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
findAndRemoveFromMap(latestEventsMap, (resource, instanceId))
}
def toImmutableSnapshot(snapshotTime: Long) =
ImplicitlyIssuedResourceEventsSnapshot(toList, snapshotTime)
- def findAndRemoveResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = {
+ def findAndRemoveResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
findAndRemoveFromMap(implicitlyIssuedEventsMap, (resource, instanceId))
}
import scala.collection.mutable
-import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
-import gr.grnet.aquarium.util.{ContextualLogger, Loggable, justForSure, failedForSure}
+import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap}
import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
import gr.grnet.aquarium.logic.accounting.Accounting
import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
-import gr.grnet.aquarium.event.{NewWalletEntry}
+import gr.grnet.aquarium.event.NewWalletEntry
import gr.grnet.aquarium.event.resource.ResourceEventModel
import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent}
-import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion.User
import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
/**
accounting: Accounting,
algorithmCompiler: CostPolicyAlgorithmCompiler,
calculationReason: UserStateChangeReason,
- clogOpt: Option[ContextualLogger] = None): Maybe[UserState] = {
+ clogOpt: Option[ContextualLogger] = None): UserState = {
val clog = ContextualLogger.fromOther(
clogOpt,
"findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
clog.begin()
- def doCompute: Maybe[UserState] = {
+ def doCompute: UserState = {
doFullMonthlyBilling(
userId,
billingMonthInfo,
// NOTE: Reason here will be: InitialUserStateSetup$
val initialUserState0 = createInitialUserStateFrom(currentUserState)
- val initialUserStateM = userStateStore.insertUserState2(initialUserState0)
+ val initialUserState1 = userStateStore.insertUserState(initialUserState0)
- clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM))
+ clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
clog.end()
- initialUserStateM
+ initialUserState1
} else {
// Ask DB cache for the latest known user state for this billing period
- val latestUserStateM = Maybe { userStateStore.findLatestUserStateForEndOfBillingMonth(
+ val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
userId,
billingMonthInfo.year,
- billingMonthInfo.month) match {
+ billingMonthInfo.month)
- case Some(latestUserState) ⇒
- latestUserState
+ latestUserStateOpt match {
case None ⇒
- null
- }}
-
- latestUserStateM match {
- case NoVal ⇒
// Not found, must compute
clog.debug("No user state found from cache, will have to (re)compute")
val result = doCompute
clog.end()
result
- case failed @ Failed(e) ⇒
- clog.warn("Failure while quering cache for user state: %s", failed)
- clog.end()
- failed
-
- case Just(latestUserState) ⇒
+ case Some(latestUserState) ⇒
// Found a "latest" user state but need to see if it is indeed the true and one latest.
// For this reason, we must count the events again.
val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
- val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
+ val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
userId,
billingMonthStartMillis,
billingMonthStopMillis)
- actualOOSEventsCounterM match {
- case NoVal ⇒
- val errMsg = "No counter computed for out of sync events. Should at least be zero."
- clog.warn(errMsg)
- val result = Failed(new AquariumException(errMsg))
+ val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
+ counterDiff match {
+ // ZERO, we are OK!
+ case 0 ⇒
+ // NOTE: Keep the caller's calculation reason
+ latestUserState.copyForChangeReason(calculationReason)
+
+ // We had more, so must recompute
+ case n if n > 0 ⇒
+ clog.debug(
+ "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
+ val result = doCompute
clog.end()
result
- case failed @ Failed(_) ⇒
- clog.warn("Failure while querying for out of sync events: %s", failed)
- clog.end()
- failed
-
- case Just(actualOOSEventsCounter) ⇒
- val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
- counterDiff match {
- // ZERO, we are OK!
- case 0 ⇒
- // NOTE: Keep the caller's calculation reason
- Just(latestUserState.copyForChangeReason(calculationReason))
-
- // We had more, so must recompute
- case n if n > 0 ⇒
- clog.debug(
- "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
- val result = doCompute
- clog.end()
- result
-
- // We had less????
- case n if n < 0 ⇒
- val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
- clog.warn(errMsg)
- val result = Failed(new AquariumException(errMsg))
- clog.end()
- result
- }
+ // We had less????
+ case n if n < 0 ⇒
+ val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
+ clog.warn(errMsg)
+ throw new AquariumException(errMsg)
}
}
}
// The resource event is billable
// Find the previous event.
// This is (potentially) needed to calculate new credit amount and new resource instance amount
- val previousResourceEventM = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
- clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
+ val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
+ clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
- val havePreviousResourceEvent = previousResourceEventM.isJust
+ val havePreviousResourceEvent = previousResourceEventOpt.isDefined
val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
if(needPreviousResourceEvent && !havePreviousResourceEvent) {
// This must be the first resource event of its kind, ever.
// TODO: We should normally check the DB to verify the claim (?)
- clog.info("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
+ clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
userStateWorker.updateIgnored(currentResourceEvent)
} else {
val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
// clog.debug("Computing full chargeslots")
- val fullChargeslotsM = accounting.computeFullChargeslots(
- previousResourceEventM,
+ val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
+ previousResourceEventOpt,
currentResourceEvent,
oldCredits,
oldAmount,
)
// We have the chargeslots, let's associate them with the current event
- fullChargeslotsM match {
- case Just((referenceTimeslot, fullChargeslots)) ⇒
- if(fullChargeslots.length == 0) {
- // At least one chargeslot is required.
- throw new AquariumException("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
- }
- clog.debugSeq("fullChargeslots", fullChargeslots, 0)
-
- // C. Compute new credit amount (based on the charge slots)
- val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
- val newCredits = oldCredits - newCreditsDiff
-
- if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
- val newWalletEntry = NewWalletEntry(
- userStateWorker.userId,
- newCreditsDiff,
- oldCredits,
- newCredits,
- TimeHelpers.nowMillis(),
- referenceTimeslot,
- billingMonthInfo.year,
- billingMonthInfo.month,
- if(havePreviousResourceEvent)
- List(currentResourceEvent, justForSure(previousResourceEventM).get)
- else
- List(currentResourceEvent),
- fullChargeslots,
- dslResource,
- currentResourceEvent.isSynthetic
- )
- clog.debug("New %s", newWalletEntry)
-
- walletEntriesBuffer += newWalletEntry
- } else {
- clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
- }
-
- _workingUserState = _workingUserState.copy(
- creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
- stateChangeCounter = _workingUserState.stateChangeCounter + 1,
- totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
- )
-
- case NoVal ⇒
- // At least one chargeslot is required.
- throw new AquariumException("No chargeslots computed")
-
- case failed@Failed(e) ⇒
- throw new AquariumException(e, "Error computing chargeslots")
+ if(fullChargeslots.length == 0) {
+ // At least one chargeslot is required.
+ throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
+ }
+ clog.debugSeq("fullChargeslots", fullChargeslots, 0)
+
+ // C. Compute new credit amount (based on the charge slots)
+ val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
+ val newCredits = oldCredits - newCreditsDiff
+
+ if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
+ val newWalletEntry = NewWalletEntry(
+ userStateWorker.userId,
+ newCreditsDiff,
+ oldCredits,
+ newCredits,
+ TimeHelpers.nowMillis(),
+ referenceTimeslot,
+ billingMonthInfo.year,
+ billingMonthInfo.month,
+ if(havePreviousResourceEvent)
+ List(currentResourceEvent, previousResourceEventOpt.get)
+ else
+ List(currentResourceEvent),
+ fullChargeslots,
+ dslResource,
+ currentResourceEvent.isSynthetic
+ )
+ clog.debug("New %s", newWalletEntry)
+
+ walletEntriesBuffer += newWalletEntry
+ } else {
+ clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
}
+
+ _workingUserState = _workingUserState.copy(
+ creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
+ stateChangeCounter = _workingUserState.stateChangeCounter + 1,
+ totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
+ )
}
}
var _workingUserState = startingUserState
- for(currentResourceEvent <- resourceEvents) {
+ for(currentResourceEvent ← resourceEvents) {
_workingUserState = processResourceEvent(
_workingUserState,
accounting: Accounting,
algorithmCompiler: CostPolicyAlgorithmCompiler,
calculationReason: UserStateChangeReason = NoSpecificChangeReason,
- clogOpt: Option[ContextualLogger] = None): Maybe[UserState] = Maybe {
+ clogOpt: Option[ContextualLogger] = None): UserState = {
val clog = ContextualLogger.fromOther(
val clogSome = Some(clog)
- val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
+ val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
userId,
billingMonthInfo.previousMonth,
storeProvider,
clogSome
)
- if(previousBillingMonthUserStateM.isNoVal) {
- throw new AquariumException("Could not calculate initial user state for billing %s".format(billingMonthInfo))
- }
- if(previousBillingMonthUserStateM.isFailed) {
- throw failedForSure(previousBillingMonthUserStateM).exception
- }
-
- val startingUserState = justForSure(previousBillingMonthUserStateM).get
+ val startingUserState = previousBillingMonthUserState
val userStateStore = storeProvider.userStateStore
val resourceEventStore = storeProvider.resourceEventStore
clog.debug("calculationReason = %s", calculationReason)
if(calculationReason.shouldStoreUserState) {
- val storedUserStateM = userStateStore.insertUserState2(_workingUserState)
- storedUserStateM match {
- case Just(storedUserState) ⇒
- clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState)
- _workingUserState = storedUserState
- case NoVal ⇒
- clog.warn("Could not store %s", _workingUserState)
- case failed @ Failed(e) ⇒
- clog.error(e, "Could not store %s", _workingUserState)
- }
+ val storedUserState = userStateStore.insertUserState(_workingUserState)
+ clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState)
+ _workingUserState = storedUserState
}
clog.debug("RETURN %s", _workingUserState)
* @param instanceId
* @return
*/
- def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = {
+ def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
// implicitly issued events are checked first
implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
- case just @ Just(_) ⇒
- just
- case NoVal ⇒
+ case some @ Some(_) ⇒
+ some
+ case None ⇒
// explicit previous resource events are checked second
previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
- case just @ Just(_) ⇒
- just
- case noValOrFailed ⇒
- noValOrFailed
+ case some @ Some(_) ⇒
+ some
+ case _ ⇒
+ None
}
- case failed ⇒
- failed
}
}
def doItFor(map: ResourceEventModel.FullMutableResourceTypeMap): Unit = {
val resourceEvents = map.valuesIterator
for {
- resourceEvent <- resourceEvents
- dslResource <- resourcesMap.findResource(resourceEvent.safeResource)
+ resourceEvent ← resourceEvents
+ dslResource ← resourcesMap.findResource(resourceEvent.safeResource)
costPolicy = dslResource.costPolicy
} {
if(costPolicy.supportsImplicitEvents) {
}
@inline
- def findAndRemoveFromMap[A, B <: AnyRef](map: scala.collection.mutable.Map[A, B], key: A): Maybe[B] = Maybe {
- map.get(key) match {
- case Some(value) ⇒
- map -= key
- value
- case None ⇒
- null.asInstanceOf[B]
- }
+ def findAndRemoveFromMap[A, B <: AnyRef](map: scala.collection.mutable.Map[A, B], key: A): Option[B] = {
+ map.remove(key)
}
// Dear scalac. Optimize this.
@inline private[this]
def hrs(millis: Double) = millis / 1000 / 60 / 60
- def apply(vars: Map[DSLCostPolicyVar, Any]): Maybe[Double] = Maybe {
+ def apply(vars: Map[DSLCostPolicyVar, Any]): Double = {
vars.apply(DSLCostPolicyNameVar) match {
case DSLCostPolicyNames.continuous ⇒
val unitPrice = vars(DSLUnitPriceVar).asInstanceOf[Double]
}
val DefaultCompiler = new CostPolicyAlgorithmCompiler {
- def compile(definition: String): Maybe[ExecutableCostPolicyAlgorithm] = {
- Just(DefaultAlgorithm)
+ def compile(definition: String): ExecutableCostPolicyAlgorithm = {
+ DefaultAlgorithm
}
}
//val DefaultAlgorithm = justForSure(DefaultCompiler.compile("")).get // hardcoded since we know exactly what this is
showResourceEvents(clog)
- val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan)
- val userState = justUserState(userStateM)
-
+ val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan)
+
showUserState(clog, userState)
expectCredits(clog, credits, userState)
showResourceEvents(clog)
- val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan)
- val userState = justUserState(userStateM)
+ val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan)
showUserState(clog, userState)
showResourceEvents(clog)
- val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan)
- val userState = justUserState(userStateM)
+ val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan)
showUserState(clog, userState)
clog.debugMap("DefaultResourcesMap", DefaultResourcesMap.map, 1)
- val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan)
- val userState = justUserState(userStateM)
+ val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan)
+
showUserState(clog, userState)
clog.end()