2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.user
39 import scala.collection.mutable
40 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
41 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
42 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap}
43 import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
44 import gr.grnet.aquarium.logic.accounting.Accounting
45 import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
46 import gr.grnet.aquarium.event.NewWalletEntry
47 import gr.grnet.aquarium.event.resource.ResourceEventModel
48 import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent}
49 import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
53 * @author Christos KK Loverdos <loverdos@gmail.com>
55 class UserStateComputations extends Loggable {
56 def createInitialUserState(imEvent: IMEventModel, credits: Double, agreementName: String) = {
57 if(!imEvent.isCreateUser) {
58 throw new AquariumInternalError(
59 "Got '%s' instead of '%s'".format(imEvent.eventType, IMEventModel.EventTypeNames.create))
62 val userID = imEvent.userID
63 val userCreationMillis = imEvent.occurredMillis
64 val now = TimeHelpers.nowMillis()
73 ImplicitlyIssuedResourceEventsSnapshot(List(), now),
76 LatestResourceEventsSnapshot(List(), now),
79 IMStateSnapshot(imEvent, now),
80 CreditSnapshot(credits, now),
81 AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
82 OwnedResourcesSnapshot(Nil, now),
88 def createInitialUserState(userID: String,
89 userCreationMillis: Long,
92 roleNames: List[String] = List(),
93 agreementName: String = DSLAgreement.DefaultAgreementName) = {
94 val now = userCreationMillis
103 ImplicitlyIssuedResourceEventsSnapshot(List(), now),
106 LatestResourceEventsSnapshot(List(), now),
114 isActive, roleNames.headOption.getOrElse("default"),
116 IMEventModel.EventTypeNames.create, Map()),
119 CreditSnapshot(credits, now),
120 AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
121 OwnedResourcesSnapshot(Nil, now),
123 InitialUserStateSetup
127 def createInitialUserStateFrom(us: UserState): UserState = {
128 createInitialUserState(
129 us.imStateSnapshot.imEvent,
130 us.creditsSnapshot.creditAmount,
131 us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last)
134 def findUserStateAtEndOfBillingMonth(userId: String,
135 billingMonthInfo: BillingMonthInfo,
136 storeProvider: StoreProvider,
137 currentUserState: UserState,
138 defaultResourcesMap: DSLResourcesMap,
139 accounting: Accounting,
140 algorithmCompiler: CostPolicyAlgorithmCompiler,
141 calculationReason: UserStateChangeReason,
142 clogOpt: Option[ContextualLogger] = None): UserState = {
144 val clog = ContextualLogger.fromOther(
147 "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
150 def doCompute: UserState = {
151 doFullMonthlyBilling(
163 val userStateStore = storeProvider.userStateStore
164 val resourceEventStore = storeProvider.resourceEventStore
166 val userCreationMillis = currentUserState.userCreationMillis
167 val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
168 val billingMonthStartMillis = billingMonthInfo.startMillis
169 val billingMonthStopMillis = billingMonthInfo.stopMillis
171 if(billingMonthStopMillis < userCreationMillis) {
172 // If the user did not exist for this billing month, piece of cake
173 clog.debug("User did not exist before %s", userCreationDateCalc)
175 // NOTE: Reason here will be: InitialUserStateSetup$
176 val initialUserState0 = createInitialUserStateFrom(currentUserState)
177 val initialUserState1 = userStateStore.insertUserState(initialUserState0)
179 clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
184 // Ask DB cache for the latest known user state for this billing period
185 val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
187 billingMonthInfo.year,
188 billingMonthInfo.month)
190 latestUserStateOpt match {
192 // Not found, must compute
193 clog.debug("No user state found from cache, will have to (re)compute")
194 val result = doCompute
198 case Some(latestUserState) ⇒
199 // Found a "latest" user state but need to see if it is indeed the true and one latest.
200 // For this reason, we must count the events again.
201 val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
202 val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
204 billingMonthStartMillis,
205 billingMonthStopMillis)
207 val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
211 // NOTE: Keep the caller's calculation reason
212 latestUserState.copyForChangeReason(calculationReason)
214 // We had more, so must recompute
217 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
218 val result = doCompute
224 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
226 throw new AquariumException(errMsg)
233 def rcDebugInfo(rcEvent: ResourceEventModel) = {
234 rcEvent.toDebugString(false)
238 def processResourceEvent(startingUserState: UserState,
239 userStateWorker: UserStateWorker,
240 currentResourceEvent: ResourceEventModel,
241 policyStore: PolicyStore,
242 stateChangeReason: UserStateChangeReason,
243 billingMonthInfo: BillingMonthInfo,
244 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
245 algorithmCompiler: CostPolicyAlgorithmCompiler,
246 clogOpt: Option[ContextualLogger] = None): UserState = {
248 val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
250 var _workingUserState = startingUserState
252 val theResource = currentResourceEvent.safeResource
253 val theInstanceId = currentResourceEvent.safeInstanceId
254 val theValue = currentResourceEvent.value
256 val accounting = userStateWorker.accounting
257 val resourcesMap = userStateWorker.resourcesMap
259 val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
260 clog.begin(currentResourceEventDebugInfo)
262 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
264 // Ignore the event if it is not billable (but still record it in the "previous" stuff).
265 // But to make this decision, first we need the resource definition (and its cost policy).
266 val dslResourceOpt = resourcesMap.findResource(theResource)
267 dslResourceOpt match {
268 // We have a resource (and thus a cost policy)
269 case Some(dslResource) ⇒
270 val costPolicy = dslResource.costPolicy
271 clog.debug("Cost policy %s for %s", costPolicy, dslResource)
272 val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
274 // The resource event is not billable
275 clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
277 // The resource event is billable
278 // Find the previous event.
279 // This is (potentially) needed to calculate new credit amount and new resource instance amount
280 val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
281 clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
283 val havePreviousResourceEvent = previousResourceEventOpt.isDefined
284 val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
285 if(needPreviousResourceEvent && !havePreviousResourceEvent) {
286 // This must be the first resource event of its kind, ever.
287 // TODO: We should normally check the DB to verify the claim (?)
288 clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
289 userStateWorker.updateIgnored(currentResourceEvent)
291 val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
292 val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
293 val oldCredits = _workingUserState.creditsSnapshot.creditAmount
295 // A. Compute new resource instance accumulating amount
296 val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
298 clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
300 // B. Compute new wallet entries
301 clog.debug("agreementsSnapshot = %s", _workingUserState.agreementsSnapshot)
302 val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
304 // clog.debug("Computing full chargeslots")
305 val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
306 previousResourceEventOpt,
307 currentResourceEvent,
319 // We have the chargeslots, let's associate them with the current event
320 if(fullChargeslots.length == 0) {
321 // At least one chargeslot is required.
322 throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
324 clog.debugSeq("fullChargeslots", fullChargeslots, 0)
326 // C. Compute new credit amount (based on the charge slots)
327 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
328 val newCredits = oldCredits - newCreditsDiff
330 if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
331 val newWalletEntry = NewWalletEntry(
332 userStateWorker.userId,
336 TimeHelpers.nowMillis(),
338 billingMonthInfo.year,
339 billingMonthInfo.month,
340 if(havePreviousResourceEvent)
341 List(currentResourceEvent, previousResourceEventOpt.get)
343 List(currentResourceEvent),
346 currentResourceEvent.isSynthetic
348 clog.debug("New %s", newWalletEntry)
350 walletEntriesBuffer += newWalletEntry
352 clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
355 _workingUserState = _workingUserState.copy(
356 creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
357 stateChangeCounter = _workingUserState.stateChangeCounter + 1,
358 totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
363 // After processing, all events billable or not update the previous state
364 userStateWorker.updatePrevious(currentResourceEvent)
366 _workingUserState = _workingUserState.copy(
367 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
370 // We do not have a resource (and thus, no cost policy)
372 // Now, this is a matter of politics: what do we do if no policy was found?
373 clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
374 } // dslResourceOpt match
376 clog.end(currentResourceEventDebugInfo)
381 def processResourceEvents(resourceEvents: Traversable[ResourceEventModel],
382 startingUserState: UserState,
383 userStateWorker: UserStateWorker,
384 policyStore: PolicyStore,
385 stateChangeReason: UserStateChangeReason,
386 billingMonthInfo: BillingMonthInfo,
387 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
388 algorithmCompiler: CostPolicyAlgorithmCompiler,
389 clogOpt: Option[ContextualLogger] = None): UserState = {
391 var _workingUserState = startingUserState
393 for(currentResourceEvent ← resourceEvents) {
395 _workingUserState = processResourceEvent(
398 currentResourceEvent,
412 def doFullMonthlyBilling(userId: String,
413 billingMonthInfo: BillingMonthInfo,
414 storeProvider: StoreProvider,
415 currentUserState: UserState,
416 defaultResourcesMap: DSLResourcesMap,
417 accounting: Accounting,
418 algorithmCompiler: CostPolicyAlgorithmCompiler,
419 calculationReason: UserStateChangeReason = NoSpecificChangeReason,
420 clogOpt: Option[ContextualLogger] = None): UserState = {
423 val clog = ContextualLogger.fromOther(
426 "doFullMonthlyBilling(%s)", billingMonthInfo)
429 val clogSome = Some(clog)
431 val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
433 billingMonthInfo.previousMonth,
439 calculationReason.forPreviousBillingMonth,
443 val startingUserState = previousBillingMonthUserState
445 val userStateStore = storeProvider.userStateStore
446 val resourceEventStore = storeProvider.resourceEventStore
447 val policyStore = storeProvider.policyStore
449 val billingMonthStartMillis = billingMonthInfo.startMillis
450 val billingMonthEndMillis = billingMonthInfo.stopMillis
452 // Keep the working (current) user state. This will get updated as we proceed with billing for the month
453 // specified in the parameters.
454 // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
455 var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
457 val userStateWorker = UserStateWorker.fromUserState(_workingUserState, accounting, defaultResourcesMap)
459 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
461 // First, find and process the actual resource events from DB
462 val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
464 billingMonthStartMillis,
465 billingMonthEndMillis)
467 val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
469 _workingUserState = processResourceEvents(
470 allResourceEventsForMonth,
481 // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
482 // ... in order to generate an implicit ON later
483 val (specialEvents, theirImplicitEnds) = userStateWorker.
484 findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
485 if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
487 clog.debug("Process implicitly issued events")
488 clog.debugSeq("specialEvents", specialEvents, 0)
489 clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
492 // Now, the previous and implicitly started must be our base for the following computation, so we create an
493 // appropriate worker
494 val specialUserStateWorker = UserStateWorker(
495 userStateWorker.userId,
496 LatestResourceEventsWorker.fromList(specialEvents),
497 ImplicitlyIssuedResourceEventsWorker.Empty,
498 IgnoredFirstResourceEventsWorker.Empty,
499 userStateWorker.accounting,
500 userStateWorker.resourcesMap
503 _workingUserState = processResourceEvents(
506 specialUserStateWorker,
515 val lastUpdateTime = TimeHelpers.nowMillis()
517 _workingUserState = _workingUserState.copy(
518 implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
519 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
520 stateChangeCounter = _workingUserState.stateChangeCounter + 1,
521 parentUserStateId = startingUserState.idOpt,
522 newWalletEntries = newWalletEntries.toList
525 clog.debug("calculationReason = %s", calculationReason)
527 if(calculationReason.shouldStoreUserState) {
528 val storedUserState = userStateStore.insertUserState(_workingUserState)
529 clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState)
530 _workingUserState = storedUserState
533 clog.debug("RETURN %s", _workingUserState)
540 * A helper object holding intermediate state/results during resource event processing.
542 * @param previousResourceEvents
543 * This is a collection of all the latest resource events.
544 * We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
545 * ones. Will be updated on processing the next resource event.
547 * @param implicitlyIssuedStartEvents
548 * The implicitly issued resource events at the beginning of the billing period.
550 * @param ignoredFirstResourceEvents
551 * The resource events that were first (and unused) of their kind.
553 * @author Christos KK Loverdos <loverdos@gmail.com>
555 case class UserStateWorker(userId: String,
556 previousResourceEvents: LatestResourceEventsWorker,
557 implicitlyIssuedStartEvents: ImplicitlyIssuedResourceEventsWorker,
558 ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker,
559 accounting: Accounting,
560 resourcesMap: DSLResourcesMap) {
563 * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource
564 * events and b) the explicit previous resource events. If the event is found, it is removed from the
567 * If the event is not found, then this must be for a new resource instance.
568 * (and probably then some `zero` resource event must be implied as the previous one)
574 def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
575 // implicitly issued events are checked first
576 implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
577 case some @ Some(_) ⇒
580 // explicit previous resource events are checked second
581 previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
582 case some @ Some(_) ⇒
590 def updateIgnored(resourceEvent: ResourceEventModel): Unit = {
591 ignoredFirstResourceEvents.updateResourceEvent(resourceEvent)
594 def updatePrevious(resourceEvent: ResourceEventModel): Unit = {
595 previousResourceEvents.updateResourceEvent(resourceEvent)
598 def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEventModel ⇒ String): Unit = {
599 if(previousResourceEvents.size > 0) {
600 val map = previousResourceEvents.latestEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
601 clog.debugMap("previousResourceEvents", map, 0)
603 if(implicitlyIssuedStartEvents.size > 0) {
604 val map = implicitlyIssuedStartEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
605 clog.debugMap("implicitlyTerminatedResourceEvents", map, 0)
607 if(ignoredFirstResourceEvents.size > 0) {
608 val map = ignoredFirstResourceEvents.ignoredFirstEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
609 clog.debugMap("ignoredFirstResourceEvents", map, 0)
614 // def allPreviousAndAllImplicitlyStarted: List[ResourceEvent] = {
615 // val buffer: FullMutableResourceTypeMap = scala.collection.mutable.Map[FullResourceType, ResourceEvent]()
617 // buffer ++= implicitlyIssuedStartEvents.implicitlyIssuedEventsMap
618 // buffer ++= previousResourceEvents.latestEventsMap
620 // buffer.valuesIterator.toList
624 * Find those events from `implicitlyIssuedStartEvents` and `previousResourceEvents` that will generate implicit
625 * end events along with those implicitly issued events. Before returning, remove the events that generated the
626 * implicit ends from the internal state of this instance.
628 * @see [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
630 def findAndRemoveGeneratorsOfImplicitEndEvents(newOccuredMillis: Long
631 ): (List[ResourceEventModel], List[ResourceEventModel]) = {
632 val buffer = mutable.ListBuffer[(ResourceEventModel, ResourceEventModel)]()
633 val checkSet = mutable.Set[ResourceEventModel]()
635 def doItFor(map: ResourceEventModel.FullMutableResourceTypeMap): Unit = {
636 val resourceEvents = map.valuesIterator
638 resourceEvent ← resourceEvents
639 dslResource ← resourcesMap.findResource(resourceEvent.safeResource)
640 costPolicy = dslResource.costPolicy
642 if(costPolicy.supportsImplicitEvents) {
643 if(costPolicy.mustConstructImplicitEndEventFor(resourceEvent)) {
644 val implicitEnd = costPolicy.constructImplicitEndEventFor(resourceEvent, newOccuredMillis)
646 if(!checkSet.contains(resourceEvent)) {
647 checkSet.add(resourceEvent)
648 buffer append ((resourceEvent, implicitEnd))
652 map.remove((resourceEvent.safeResource, resourceEvent.safeInstanceId))
658 doItFor(previousResourceEvents.latestEventsMap) // we give priority for previous
659 doItFor(implicitlyIssuedStartEvents.implicitlyIssuedEventsMap) // ... over implicitly issued...
661 (buffer.view.map(_._1).toList, buffer.view.map(_._2).toList)
665 object UserStateWorker {
666 def fromUserState(userState: UserState, accounting: Accounting, resourcesMap: DSLResourcesMap): UserStateWorker = {
669 userState.latestResourceEventsSnapshot.toMutableWorker,
670 userState.implicitlyIssuedSnapshot.toMutableWorker,
671 IgnoredFirstResourceEventsWorker.Empty,