2d5f92d0128ecb6b54ec6449edc9f1ae1400d79b
[aquarium] / src / main / scala / gr / grnet / aquarium / computation / UserStateComputations.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.computation
37
38 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
39 import gr.grnet.aquarium.util.shortClassNameOf
40 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
41 import gr.grnet.aquarium.computation.state.parts._
42 import gr.grnet.aquarium.event.model.NewWalletEntry
43 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
44 import gr.grnet.aquarium.{AquariumAwareSkeleton, AquariumInternalError}
45 import gr.grnet.aquarium.computation.reason.{MonthlyBillingCalculation, InitialUserStateSetup, UserStateChangeReason}
46 import gr.grnet.aquarium.computation.state.{UserStateWorker, UserStateBootstrap, UserState}
47 import gr.grnet.aquarium.policy.ResourceType
48
49 /**
50  *
51  * @author Christos KK Loverdos <loverdos@gmail.com>
52  */
53 final class UserStateComputations extends AquariumAwareSkeleton with Loggable {
54   lazy val timeslotComputations  = new TimeslotComputations {} // FIXME
55   lazy val policyStore           = aquarium.policyStore
56   lazy val userStateStoreForRead = aquarium.userStateStore
57   lazy val resourceEventStore    = aquarium.resourceEventStore
58
59   def findUserStateAtEndOfBillingMonth(
60       userStateBootstrap: UserStateBootstrap,
61       billingMonthInfo: BillingMonthInfo,
62       defaultResourceTypesMap: Map[String, ResourceType],
63       calculationReason: UserStateChangeReason,
64       storeFunc: UserState ⇒ UserState,
65       clogOpt: Option[ContextualLogger] = None
66   ): UserState = {
67
68     val clog = ContextualLogger.fromOther(
69       clogOpt,
70       logger,
71       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
72     clog.begin()
73
74     def computeFullMonthBillingAndSaveState(): UserState = {
75       val userState0 = doFullMonthBilling(
76         userStateBootstrap,
77         billingMonthInfo,
78         defaultResourceTypesMap,
79         calculationReason,
80         storeFunc,
81         Some(clog)
82       )
83
84       // We always save the state when it is a full month billing
85       val userState1 = storeFunc.apply(
86         userState0.newWithChangeReason(
87           MonthlyBillingCalculation(calculationReason, billingMonthInfo))
88       )
89
90       clog.debug("Stored full %s %s", billingMonthInfo.toDebugString, userState1.toJsonString)
91       userState1
92     }
93
94     val userID = userStateBootstrap.userID
95     val userCreationMillis = userStateBootstrap.userCreationMillis
96     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
97     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
98     val billingMonthStopMillis = billingMonthInfo.monthStopMillis
99
100     if(billingMonthStopMillis < userCreationMillis) {
101       // If the user did not exist for this billing month, piece of cake
102       clog.debug("User did not exist before %s", userCreationDateCalc)
103
104       // TODO: The initial user state might have already been created.
105       //       First ask if it exists and compute only if not
106       val initialUserState0 = UserState.createInitialUserStateFromBootstrap(
107         userStateBootstrap,
108         TimeHelpers.nowMillis(),
109         InitialUserStateSetup(Some(calculationReason)) // we record the originating calculation reason
110       )
111
112       // We always save the initial state
113       val initialUserState1 = storeFunc.apply(initialUserState0)
114
115       clog.debug("Stored initial state = %s", initialUserState1.toJsonString)
116       clog.end()
117
118       return initialUserState1
119     }
120
121     // Ask DB cache for the latest known user state for this billing period
122     val latestUserStateOpt = userStateStoreForRead.findLatestUserStateForFullMonthBilling(
123       userID,
124       billingMonthInfo)
125
126     latestUserStateOpt match {
127       case None ⇒
128         // Not found, must compute
129         clog.debug("No user state found from cache, will have to (re)compute")
130         val result = computeFullMonthBillingAndSaveState
131         clog.end()
132         result
133
134       case Some(latestUserState) ⇒
135         // Found a "latest" user state but need to see if it is indeed the true and one latest.
136         // For this reason, we must count the events again.
137         val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
138         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
139           userID,
140           billingMonthStartMillis,
141           billingMonthStopMillis)
142
143         val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
144         counterDiff match {
145           // ZERO, we are OK!
146           case 0 ⇒
147             // NOTE: Keep the caller's calculation reason
148             val result = latestUserState.newWithChangeReason(calculationReason)
149             clog.end()
150             result
151
152           // We had more, so must recompute
153           case n if n > 0 ⇒
154             clog.debug(
155               "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
156             val result = computeFullMonthBillingAndSaveState
157             clog.end()
158             result
159
160           // We had less????
161           case n if n < 0 ⇒
162             val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
163             clog.warn(errMsg)
164             throw new AquariumInternalError(errMsg)
165         }
166     }
167   }
168
169   //+ Utility methods
170   protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
171     rcEvent.toDebugString
172   }
173   //- Utility methods
174
175   def processResourceEvent(
176       startingUserState: UserState,
177       userStateWorker: UserStateWorker,
178       currentResourceEvent: ResourceEventModel,
179       stateChangeReason: UserStateChangeReason,
180       billingMonthInfo: BillingMonthInfo,
181       walletEntryRecorder: NewWalletEntry ⇒ Unit,
182       clogOpt: Option[ContextualLogger] = None
183   ): UserState = {
184
185     val clog = ContextualLogger.fromOther(clogOpt, logger, "processResourceEvent(%s)", currentResourceEvent.id)
186
187     var _workingUserState = startingUserState
188
189     val theResource = currentResourceEvent.safeResource
190     val theInstanceId = currentResourceEvent.safeInstanceId
191     val theValue = currentResourceEvent.value
192     val theDetails = currentResourceEvent.details
193
194     val resourceTypesMap = userStateWorker.resourceTypesMap
195
196     val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
197     clog.begin(currentResourceEventDebugInfo)
198
199     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
200
201     // Ignore the event if it is not billable (but still record it in the "previous" stuff).
202     // But to make this decision, first we need the resource type (and its charging behavior).
203     resourceTypesMap.get(theResource) match {
204       // We have a resource type (and thus a charging behavior)
205       case Some(resourceType) ⇒
206         val chargingBehavior = resourceType.chargingBehavior
207         clog.debug("%s for %s", chargingBehavior, resourceType)
208         val isBillable = chargingBehavior.isBillableEvent(currentResourceEvent)
209         if(!isBillable) {
210           // The resource event is not billable
211           clog.debug("Ignoring not billable %s", currentResourceEventDebugInfo)
212         } else {
213           // The resource event is billable
214           // Find the previous event.
215           // This is (potentially) needed to calculate new credit amount and new resource instance amount
216           val previousResourceEventOpt0 = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
217           clog.debug("PreviousM %s", previousResourceEventOpt0.map(rcDebugInfo(_)))
218
219           val havePreviousResourceEvent = previousResourceEventOpt0.isDefined
220           val needPreviousResourceEvent = chargingBehavior.needsPreviousEventForCreditAndAmountCalculation
221
222           val (proceed, previousResourceEventOpt1) = if(needPreviousResourceEvent && !havePreviousResourceEvent) {
223             // This must be the first resource event of its kind, ever.
224             // TODO: We should normally check the DB to verify the claim (?)
225
226             val actualFirstEvent = currentResourceEvent
227
228             if(chargingBehavior.isBillableFirstEvent(actualFirstEvent) &&
229               chargingBehavior.mustGenerateDummyFirstEvent) {
230
231               clog.debug("First event of its kind %s", currentResourceEventDebugInfo)
232
233               // OK. Let's see what the cost policy decides. If it must generate a dummy first event, we use that.
234               // Otherwise, the current event goes to the ignored list.
235               // The dummy first is considered to exist at the beginning of the billing period
236
237               val dummyFirst = chargingBehavior.constructDummyFirstEventFor(currentResourceEvent, billingMonthInfo.monthStartMillis)
238
239               clog.debug("Dummy first companion %s", rcDebugInfo(dummyFirst))
240
241               // proceed with charging???
242               (true, Some(dummyFirst))
243             } else {
244               clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
245               userStateWorker.updateIgnored(currentResourceEvent)
246               (false, None)
247             }
248           } else {
249             (true, previousResourceEventOpt0)
250           }
251
252           if(proceed) {
253             val defaultInitialAmount = chargingBehavior.getResourceInstanceInitialAmount
254             val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
255             val oldCredits = _workingUserState.totalCredits
256
257             // A. Compute new resource instance accumulating amount
258             val newAccumulatingAmount = chargingBehavior.computeNewAccumulatingAmount(oldAmount, theValue, theDetails)
259
260             clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAccumulatingAmount, oldCredits)
261
262             // B. Compute new wallet entries
263             clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
264             val alltimeAgreements = _workingUserState.agreementHistory.agreementsByTimeslot
265
266             //              clog.debug("Computing full chargeslots")
267             val (referenceTimeslot, fullChargeslots) = timeslotComputations.computeFullChargeslots(
268               previousResourceEventOpt1,
269               currentResourceEvent,
270               oldCredits,
271               oldAmount,
272               newAccumulatingAmount,
273               resourceType,
274               alltimeAgreements,
275               policyStore,
276               Some(clog)
277             )
278
279             // We have the chargeslots, let's associate them with the current event
280             if(fullChargeslots.length == 0) {
281               // At least one chargeslot is required.
282               throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
283             }
284             clog.debugSeq("fullChargeslots", fullChargeslots, 0)
285
286             // C. Compute new credit amount (based on the charge slots)
287             val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
288             val newCredits = oldCredits - newCreditsDiff
289
290             val newWalletEntry = NewWalletEntry(
291               userStateWorker.userID,
292               newCreditsDiff,
293               oldCredits,
294               newCredits,
295               TimeHelpers.nowMillis(),
296               referenceTimeslot,
297               billingMonthInfo.year,
298               billingMonthInfo.month,
299               if(havePreviousResourceEvent)
300                 List(currentResourceEvent, previousResourceEventOpt1.get)
301               else
302                 List(currentResourceEvent),
303               fullChargeslots,
304               resourceType,
305               currentResourceEvent.isSynthetic
306             )
307             clog.debug("%s = %s", shortClassNameOf(newWalletEntry), newWalletEntry)
308
309             walletEntryRecorder.apply(newWalletEntry)
310
311             _workingUserState = _workingUserState.copy(
312               totalCredits = newCredits,
313               stateChangeCounter = _workingUserState.stateChangeCounter + 1
314             )
315           }
316         }
317
318         // After processing, all events billable or not update the previous state
319         userStateWorker.updatePrevious(currentResourceEvent)
320
321         _workingUserState = _workingUserState.copy(
322           latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
323         )
324
325       // We do not have a resource (and thus, no cost policy)
326       case None ⇒
327         // Now, this is a matter of politics: what do we do if no policy was found?
328         clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
329     } // dslResourceOpt match
330
331     clog.end(currentResourceEventDebugInfo)
332
333     _workingUserState
334   }
335
336   def processResourceEvents(
337       resourceEvents: Traversable[ResourceEventModel],
338       startingUserState: UserState,
339       userStateWorker: UserStateWorker,
340       stateChangeReason: UserStateChangeReason,
341       billingMonthInfo: BillingMonthInfo,
342       walletEntryRecorder: NewWalletEntry ⇒ Unit,
343       clogOpt: Option[ContextualLogger] = None
344   ): UserState = {
345
346     var _workingUserState = startingUserState
347
348     for(currentResourceEvent ← resourceEvents) {
349
350       _workingUserState = processResourceEvent(
351         _workingUserState,
352         userStateWorker,
353         currentResourceEvent,
354         stateChangeReason,
355         billingMonthInfo,
356         walletEntryRecorder,
357         clogOpt
358       )
359     }
360
361     _workingUserState
362   }
363
364   def doFullMonthBilling(
365       userStateBootstrap: UserStateBootstrap,
366       billingMonthInfo: BillingMonthInfo,
367       defaultResourceTypesMap: Map[String, ResourceType],
368       calculationReason: UserStateChangeReason,
369       storeFunc: UserState ⇒ UserState,
370       clogOpt: Option[ContextualLogger] = None
371   ): UserState = {
372
373     doMonthBillingUpTo(
374       billingMonthInfo,
375       billingMonthInfo.monthStopMillis,
376       userStateBootstrap,
377       defaultResourceTypesMap,
378       calculationReason,
379       storeFunc,
380       clogOpt
381     )
382   }
383
384   def doMonthBillingUpTo(
385       /**
386        * Which month to bill.
387        */
388       billingMonthInfo: BillingMonthInfo,
389       /**
390        * Bill from start of month up to (and including) this time.
391        */
392       billingEndTimeMillis: Long,
393       userStateBootstrap: UserStateBootstrap,
394       defaultResourceTypesMap: Map[String, ResourceType],
395       calculationReason: UserStateChangeReason,
396       storeFunc: UserState ⇒ UserState,
397       clogOpt: Option[ContextualLogger] = None
398   ): UserState = {
399
400     val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
401     val userID = userStateBootstrap.userID
402
403     val clog = ContextualLogger.fromOther(
404       clogOpt,
405       logger,
406       "doMonthBillingUpTo(%s)", new MutableDateCalc(billingEndTimeMillis).toYYYYMMDDHHMMSSSSS)
407     clog.begin()
408
409     clog.debug("%s", calculationReason)
410
411     val clogSome = Some(clog)
412
413     val previousBillingMonthInfo = billingMonthInfo.previousMonth
414     val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
415       userStateBootstrap,
416       previousBillingMonthInfo,
417       defaultResourceTypesMap,
418       calculationReason,
419       storeFunc,
420       clogSome
421     )
422
423     clog.debug("previousBillingMonthUserState(%s) = %s".format(
424       previousBillingMonthInfo.toShortDebugString,
425       previousBillingMonthUserState.toJsonString))
426
427     val startingUserState = previousBillingMonthUserState
428
429     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
430     // specified in the parameters.
431     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
432     var _workingUserState = startingUserState.newWithChangeReason(calculationReason)
433
434     val userStateWorker = UserStateWorker.fromUserState(_workingUserState, defaultResourceTypesMap)
435
436     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
437
438     // First, find and process the actual resource events from DB
439     val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
440     val walletEntryRecorder = (nwe: NewWalletEntry) ⇒ {
441       newWalletEntries.append(nwe)
442     }
443
444     var _rcEventsCounter = 0
445     resourceEventStore.foreachResourceEventOccurredInPeriod(
446       userID,
447       billingMonthInfo.monthStartMillis, // from start of month
448       billingEndTimeMillis               // to requested time
449     ) { currentResourceEvent ⇒
450
451       clog.debug("Processing %s".format(currentResourceEvent))
452
453       _workingUserState = processResourceEvent(
454         _workingUserState,
455         userStateWorker,
456         currentResourceEvent,
457         calculationReason,
458         billingMonthInfo,
459         walletEntryRecorder,
460         clogSome
461       )
462
463       _rcEventsCounter += 1
464     }
465
466     clog.debug("Found %s resource events for month %s".format(_rcEventsCounter, billingMonthInfo.toShortDebugString))
467
468     if(isFullMonthBilling) {
469       // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
470       // ... in order to generate an implicit ON later (during the next billing cycle).
471       val (specialEvents, theirImplicitEnds) = userStateWorker.
472         findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthInfo.monthStopMillis)
473
474       if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
475         clog.debug("")
476         clog.debug("Process implicitly issued events")
477         clog.debugSeq("specialEvents", specialEvents, 0)
478         clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
479       }
480
481       // Now, the previous and implicitly started must be our base for the following computation, so we create an
482       // appropriate worker
483       val specialUserStateWorker = UserStateWorker(
484         userStateWorker.userID,
485         LatestResourceEventsWorker.fromList(specialEvents),
486         ImplicitlyIssuedResourceEventsWorker.Empty,
487         IgnoredFirstResourceEventsWorker.Empty,
488         userStateWorker.resourceTypesMap
489       )
490
491       _workingUserState = processResourceEvents(
492         theirImplicitEnds,
493         _workingUserState,
494         specialUserStateWorker,
495         calculationReason,
496         billingMonthInfo,
497         walletEntryRecorder,
498         clogSome
499       )
500     }
501
502     val lastUpdateTime = TimeHelpers.nowMillis()
503
504     _workingUserState = _workingUserState.copy(
505       isFullBillingMonthState = isFullMonthBilling,
506
507       theFullBillingMonth = if(isFullMonthBilling)
508         Some(billingMonthInfo)
509       else
510         _workingUserState.theFullBillingMonth,
511
512       implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
513
514       latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
515
516       stateChangeCounter = _workingUserState.stateChangeCounter + 1,
517
518       parentUserStateIDInStore = startingUserState.idInStore,
519
520       newWalletEntries = newWalletEntries.toList
521     )
522
523     clog.end()
524     _workingUserState
525   }
526 }