2 * Copyright 2011 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.user
39 import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
40 import gr.grnet.aquarium.util.{ContextualLogger, Loggable, justForSure, failedForSure}
41 import gr.grnet.aquarium.logic.events.{NewWalletEntry, ResourceEvent}
42 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
43 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLCostPolicy, DSLResourcesMap, DSLPolicy}
44 import gr.grnet.aquarium.store.{RecordID, StoreProvider, PolicyStore, UserStateStore, ResourceEventStore}
45 import gr.grnet.aquarium.logic.accounting.{Chargeslot, Accounting}
46 import scala.collection.mutable
47 import gr.grnet.aquarium.logic.events.ResourceEvent._
48 import gr.grnet.aquarium.logic.accounting.algorithm.{CostPolicyAlgorithmCompiler, SimpleCostPolicyAlgorithmCompiler}
52 * @author Christos KK Loverdos <loverdos@gmail.com>
54 class UserStateComputations extends Loggable {
55 def createInitialUserState(userId: String,
56 userCreationMillis: Long,
59 roleNames: List[String] = List(),
60 agreementName: String = DSLAgreement.DefaultAgreementName) = {
61 val now = userCreationMillis
69 ImplicitlyIssuedResourceEventsSnapshot(List(), now),
72 LatestResourceEventsSnapshot(List(), now),
75 ActiveStateSnapshot(isActive, now),
76 CreditSnapshot(credits, now),
77 AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
78 RolesSnapshot(roleNames, now),
79 OwnedResourcesSnapshot(Nil, now),
81 UserStateChangeReasonCodes.InitialCalculationCode,
82 InitialUserStateCalculation
86 def createInitialUserStateFrom(us: UserState): UserState = {
87 createInitialUserState(
89 us.userCreationMillis,
90 us.activeStateSnapshot.isActive,
91 us.creditsSnapshot.creditAmount,
92 us.rolesSnapshot.roles,
93 us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last
97 def findUserStateAtEndOfBillingMonth(userId: String,
98 billingMonthInfo: BillingMonthInfo,
99 storeProvider: StoreProvider,
100 currentUserState: UserState,
101 defaultResourcesMap: DSLResourcesMap,
102 accounting: Accounting,
103 algorithmCompiler: CostPolicyAlgorithmCompiler,
104 calculationReason: UserStateChangeReason,
105 contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = {
107 val clog = ContextualLogger.fromOther(
110 "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
113 def doCompute: Maybe[UserState] = {
114 doFullMonthlyBilling(
126 val userStateStore = storeProvider.userStateStore
127 val resourceEventStore = storeProvider.resourceEventStore
129 val userCreationMillis = currentUserState.userCreationMillis
130 val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
131 val billingMonthStartMillis = billingMonthInfo.startMillis
132 val billingMonthStopMillis = billingMonthInfo.stopMillis
134 if(billingMonthStopMillis < userCreationMillis) {
135 // If the user did not exist for this billing month, piece of cake
136 clog.debug("User did not exist before %s", userCreationDateCalc)
138 // NOTE: Reason here will be: InitialUserStateCalculation
139 val initialUserState0 = createInitialUserStateFrom(currentUserState)
140 val initialUserStateM = userStateStore.storeUserState2(initialUserState0)
142 clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM))
147 // Ask DB cache for the latest known user state for this billing period
148 val latestUserStateM = userStateStore.findLatestUserStateForEndOfBillingMonth(
150 billingMonthInfo.year,
151 billingMonthInfo.month)
153 latestUserStateM match {
155 // Not found, must compute
156 clog.debug("No user state found from cache, will have to (re)compute")
157 val result = doCompute
161 case failed @ Failed(_, _) ⇒
162 clog.warn("Failure while quering cache for user state: %s", failed)
166 case Just(latestUserState) ⇒
167 // Found a "latest" user state but need to see if it is indeed the true and one latest.
168 // For this reason, we must count the events again.
169 val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
170 val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
172 billingMonthStartMillis,
173 billingMonthStopMillis)
175 actualOOSEventsCounterM match {
177 val errMsg = "No counter computed for out of sync events. Should at least be zero."
179 val result = Failed(new Exception(errMsg))
183 case failed @ Failed(_, _) ⇒
184 clog.warn("Failure while querying for out of sync events: %s", failed)
188 case Just(actualOOSEventsCounter) ⇒
189 val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
193 // NOTE: Keep the caller's calculation reason
194 Just(latestUserState.copyForChangeReason(calculationReason))
196 // We had more, so must recompute
199 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
200 val result = doCompute
206 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
208 val result = Failed(new Exception(errMsg))
218 def rcDebugInfo(rcEvent: ResourceEvent) = {
219 rcEvent.toDebugString(false)
223 def processResourceEvent(startingUserState: UserState,
224 userStateWorker: UserStateWorker,
225 currentResourceEvent: ResourceEvent,
226 policyStore: PolicyStore,
227 stateChangeReason: UserStateChangeReason,
228 billingMonthInfo: BillingMonthInfo,
229 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
230 algorithmCompiler: CostPolicyAlgorithmCompiler,
231 clogM: Maybe[ContextualLogger] = NoVal): UserState = {
233 val clog = ContextualLogger.fromOther(clogM, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
235 var _workingUserState = startingUserState
237 val theResource = currentResourceEvent.safeResource
238 val theInstanceId = currentResourceEvent.safeInstanceId
239 val theValue = currentResourceEvent.value
241 val accounting = userStateWorker.accounting
242 val resourcesMap = userStateWorker.resourcesMap
244 val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
245 clog.begin(currentResourceEventDebugInfo)
247 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
249 // Ignore the event if it is not billable (but still record it in the "previous" stuff).
250 // But to make this decision, first we need the resource definition (and its cost policy).
251 val dslResourceOpt = resourcesMap.findResource(theResource)
252 dslResourceOpt match {
253 // We have a resource (and thus a cost policy)
254 case Some(dslResource) ⇒
255 val costPolicy = dslResource.costPolicy
256 clog.debug("Cost policy %s for %s", costPolicy, dslResource)
257 val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
259 // The resource event is not billable
260 clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
262 // The resource event is billable
263 // Find the previous event.
264 // This is (potentially) needed to calculate new credit amount and new resource instance amount
265 val previousResourceEventM = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
266 clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
268 val havePreviousResourceEvent = previousResourceEventM.isJust
269 val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
270 if(needPreviousResourceEvent && !havePreviousResourceEvent) {
271 // This must be the first resource event of its kind, ever.
272 // TODO: We should normally check the DB to verify the claim (?)
273 clog.info("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
274 userStateWorker.updateIgnored(currentResourceEvent)
276 val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
277 val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
278 val oldCredits = _workingUserState.creditsSnapshot.creditAmount
280 // A. Compute new resource instance accumulating amount
281 val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
283 clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
285 // B. Compute new wallet entries
286 clog.debug("agreementsSnapshot = %s", _workingUserState.agreementsSnapshot)
287 val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
289 // clog.debug("Computing full chargeslots")
290 val fullChargeslotsM = accounting.computeFullChargeslots(
291 previousResourceEventM,
292 currentResourceEvent,
304 // We have the chargeslots, let's associate them with the current event
305 fullChargeslotsM match {
306 case Just((referenceTimeslot, fullChargeslots)) ⇒
307 if(fullChargeslots.length == 0) {
308 // At least one chargeslot is required.
309 throw new Exception("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
311 clog.debugSeq("fullChargeslots", fullChargeslots, 0)
313 // C. Compute new credit amount (based on the charge slots)
314 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
315 val newCredits = oldCredits - newCreditsDiff
317 if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
318 val newWalletEntry = NewWalletEntry(
319 userStateWorker.userId,
323 TimeHelpers.nowMillis,
325 billingMonthInfo.year,
326 billingMonthInfo.month,
327 if(havePreviousResourceEvent)
328 List(currentResourceEvent, justForSure(previousResourceEventM).get)
330 List(currentResourceEvent),
333 currentResourceEvent.isSynthetic
335 clog.debug("New %s", newWalletEntry)
337 walletEntriesBuffer += newWalletEntry
339 clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
342 _workingUserState = _workingUserState.copy(
343 creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis),
344 stateChangeCounter = _workingUserState.stateChangeCounter + 1,
345 totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
349 // At least one chargeslot is required.
350 throw new Exception("No chargeslots computed")
352 case failed@Failed(e, m) ⇒
353 throw new Exception(m, e)
358 // After processing, all events billable or not update the previous state
359 userStateWorker.updatePrevious(currentResourceEvent)
361 // We do not have a resource (and thus, no cost policy)
363 // Now, this is a matter of politics: what do we do if no policy was found?
364 clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
365 } // dslResourceOpt match
367 clog.end(currentResourceEventDebugInfo)
372 def processResourceEvents(resourceEvents: Traversable[ResourceEvent],
373 startingUserState: UserState,
374 userStateWorker: UserStateWorker,
375 policyStore: PolicyStore,
376 stateChangeReason: UserStateChangeReason,
377 billingMonthInfo: BillingMonthInfo,
378 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
379 algorithmCompiler: CostPolicyAlgorithmCompiler,
380 clogM: Maybe[ContextualLogger] = NoVal): UserState = {
382 var _workingUserState = startingUserState
384 for(currentResourceEvent <- resourceEvents) {
386 _workingUserState = processResourceEvent(
389 currentResourceEvent,
403 def doFullMonthlyBilling(userId: String,
404 billingMonthInfo: BillingMonthInfo,
405 storeProvider: StoreProvider,
406 currentUserState: UserState,
407 defaultResourcesMap: DSLResourcesMap,
408 accounting: Accounting,
409 algorithmCompiler: CostPolicyAlgorithmCompiler,
410 calculationReason: UserStateChangeReason = NoSpecificChangeReason,
411 contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = Maybe {
414 val clog = ContextualLogger.fromOther(
417 "doFullMonthlyBilling(%s)", billingMonthInfo)
420 val clogJ = Just(clog)
422 val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
424 billingMonthInfo.previousMonth,
430 calculationReason.forPreviousBillingMonth,
434 if(previousBillingMonthUserStateM.isNoVal) {
435 throw new Exception("Could not calculate initial user state for billing %s".format(billingMonthInfo))
437 if(previousBillingMonthUserStateM.isFailed) {
438 throw failedForSure(previousBillingMonthUserStateM).exception
441 val startingUserState = justForSure(previousBillingMonthUserStateM).get
443 val userStateStore = storeProvider.userStateStore
444 val resourceEventStore = storeProvider.resourceEventStore
445 val policyStore = storeProvider.policyStore
447 val billingMonthStartMillis = billingMonthInfo.startMillis
448 val billingMonthEndMillis = billingMonthInfo.stopMillis
450 // Keep the working (current) user state. This will get updated as we proceed with billing for the month
451 // specified in the parameters.
452 // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
453 var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
455 val userStateWorker = UserStateWorker.fromUserState(_workingUserState, accounting, defaultResourcesMap)
457 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
459 // First, find and process the actual resource events from DB
460 val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
462 billingMonthStartMillis,
463 billingMonthEndMillis)
465 val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
467 _workingUserState = processResourceEvents(
468 allResourceEventsForMonth,
479 // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
480 // ... in order to generate an implicit ON later
481 val (specialEvents, theirImplicitEnds) = userStateWorker.
482 findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
483 if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
485 clog.debug("Process implicitly issued events")
486 clog.debugSeq("specialEvents", specialEvents, 0)
487 clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
490 // Now, the previous and implicitly started must be our base for the following computation, so we create an
491 // appropriate worker
492 val specialUserStateWorker = UserStateWorker(
493 userStateWorker.userId,
494 LatestResourceEventsWorker.fromList(specialEvents),
495 ImplicitlyIssuedResourceEventsWorker.Empty,
496 IgnoredFirstResourceEventsWorker.Empty,
497 userStateWorker.accounting,
498 userStateWorker.resourcesMap
501 _workingUserState = processResourceEvents(
504 specialUserStateWorker,
513 val lastUpdateTime = TimeHelpers.nowMillis
515 _workingUserState = _workingUserState.copy(
516 implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
517 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
518 stateChangeCounter = _workingUserState.stateChangeCounter + 1,
519 parentUserStateId = startingUserState.idOpt,
520 newWalletEntries = newWalletEntries.toList
523 clog.debug("calculationReason = %s", calculationReason)
525 if(calculationReason.shouldStoreUserState) {
526 val storedUserStateM = userStateStore.storeUserState2(_workingUserState)
527 storedUserStateM match {
528 case Just(storedUserState) ⇒
529 clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState)
530 _workingUserState = storedUserState
532 clog.warn("Could not store %s", _workingUserState)
533 case failed @ Failed(e, m) ⇒
534 clog.error(e, "Could not store %s", _workingUserState)
538 clog.debug("RETURN %s", _workingUserState)
545 * A helper object holding intermediate state/results during resource event processing.
547 * @param previousResourceEvents
548 * This is a collection of all the latest resource events.
549 * We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
550 * ones. Will be updated on processing the next resource event.
552 * @param implicitlyIssuedStartEvents
553 * The implicitly issued resource events at the beginning of the billing period.
555 * @param ignoredFirstResourceEvents
556 * The resource events that were first (and unused) of their kind.
558 * @author Christos KK Loverdos <loverdos@gmail.com>
560 case class UserStateWorker(userId: String,
561 previousResourceEvents: LatestResourceEventsWorker,
562 implicitlyIssuedStartEvents: ImplicitlyIssuedResourceEventsWorker,
563 ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker,
564 accounting: Accounting,
565 resourcesMap: DSLResourcesMap) {
568 * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource
569 * events and b) the explicit previous resource events. If the event is found, it is removed from the
572 * If the event is not found, then this must be for a new resource instance.
573 * (and probably then some `zero` resource event must be implied as the previous one)
579 def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = {
580 // implicitly issued events are checked first
581 implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
582 case just @ Just(_) ⇒
585 // explicit previous resource events are checked second
586 previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
587 case just @ Just(_) ⇒
597 def updateIgnored(resourceEvent: ResourceEvent): Unit = {
598 ignoredFirstResourceEvents.updateResourceEvent(resourceEvent)
601 def updatePrevious(resourceEvent: ResourceEvent): Unit = {
602 previousResourceEvents.updateResourceEvent(resourceEvent)
605 def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEvent ⇒ String): Unit = {
606 if(previousResourceEvents.size > 0) {
607 val map = previousResourceEvents.latestEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
608 clog.debugMap("previousResourceEvents", map, 0)
610 if(implicitlyIssuedStartEvents.size > 0) {
611 val map = implicitlyIssuedStartEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
612 clog.debugMap("implicitlyTerminatedResourceEvents", map, 0)
614 if(ignoredFirstResourceEvents.size > 0) {
615 val map = ignoredFirstResourceEvents.ignoredFirstEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
616 clog.debugMap("ignoredFirstResourceEvents", map, 0)
621 // def allPreviousAndAllImplicitlyStarted: List[ResourceEvent] = {
622 // val buffer: FullMutableResourceTypeMap = scala.collection.mutable.Map[FullResourceType, ResourceEvent]()
624 // buffer ++= implicitlyIssuedStartEvents.implicitlyIssuedEventsMap
625 // buffer ++= previousResourceEvents.latestEventsMap
627 // buffer.valuesIterator.toList
631 * Find those events from `implicitlyIssuedStartEvents` and `previousResourceEvents` that will generate implicit
632 * end events along with those implicitly issued events. Before returning, remove the events that generated the
633 * implicit ends from the internal state of this instance.
635 * @see [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
637 def findAndRemoveGeneratorsOfImplicitEndEvents(newOccuredMillis: Long
638 ): (List[ResourceEvent], List[ResourceEvent]) = {
639 val buffer = mutable.ListBuffer[(ResourceEvent, ResourceEvent)]()
640 val checkSet = mutable.Set[ResourceEvent]()
642 def doItFor(map: ResourceEvent.FullMutableResourceTypeMap): Unit = {
643 val resourceEvents = map.valuesIterator
645 resourceEvent <- resourceEvents
646 dslResource <- resourcesMap.findResource(resourceEvent.safeResource)
647 costPolicy = dslResource.costPolicy
649 if(costPolicy.supportsImplicitEvents) {
650 if(costPolicy.mustConstructImplicitEndEventFor(resourceEvent)) {
651 val implicitEnd = costPolicy.constructImplicitEndEventFor(resourceEvent, newOccuredMillis)
653 if(!checkSet.contains(resourceEvent)) {
654 checkSet.add(resourceEvent)
655 buffer append ((resourceEvent, implicitEnd))
659 map.remove((resourceEvent.safeResource, resourceEvent.safeInstanceId))
665 doItFor(previousResourceEvents.latestEventsMap) // we give priority for previous
666 doItFor(implicitlyIssuedStartEvents.implicitlyIssuedEventsMap) // ... over implicitly issued...
668 (buffer.view.map(_._1).toList, buffer.view.map(_._2).toList)
672 object UserStateWorker {
673 def fromUserState(userState: UserState, accounting: Accounting, resourcesMap: DSLResourcesMap): UserStateWorker = {
676 userState.latestResourceEventsSnapshot.toMutableWorker,
677 userState.implicitlyIssuedSnapshot.toMutableWorker,
678 IgnoredFirstResourceEventsWorker.Empty,