Add two more REST internal calls
[aquarium] / src / main / scala / gr / grnet / aquarium / computation / UserStateComputations.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.computation
37
38 import scala.collection.mutable
39 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
40 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
41 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
42 import gr.grnet.aquarium.computation.state.parts._
43 import gr.grnet.aquarium.event.model.NewWalletEntry
44 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
45 import gr.grnet.aquarium.{Aquarium, AquariumInternalError}
46 import gr.grnet.aquarium.computation.reason.{MonthlyBillingCalculation, InitialUserStateSetup, UserStateChangeReason}
47 import gr.grnet.aquarium.computation.state.{UserStateWorker, UserStateBootstrap, UserState}
48
49 /**
50  *
51  * @author Christos KK Loverdos <loverdos@gmail.com>
52  */
53 final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
54
55   lazy val aquarium = _aquarium
56
57   lazy val storeProvider         = aquarium.storeProvider
58   lazy val timeslotComputations  = new TimeslotComputations {}
59   lazy val algorithmCompiler     = aquarium.algorithmCompiler
60   lazy val policyStore           = storeProvider.policyStore
61   lazy val userStateStoreForRead = storeProvider.userStateStore
62   lazy val resourceEventStore    = storeProvider.resourceEventStore
63
64   def findUserStateAtEndOfBillingMonth(
65       userStateBootstrap: UserStateBootstrap,
66       billingMonthInfo: BillingMonthInfo,
67       defaultResourcesMap: DSLResourcesMap,
68       calculationReason: UserStateChangeReason,
69       storeFunc: UserState ⇒ UserState,
70       clogOpt: Option[ContextualLogger] = None
71   ): UserState = {
72
73     val clog = ContextualLogger.fromOther(
74       clogOpt,
75       logger,
76       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
77     clog.begin()
78
79     def computeFullMonthBillingAndSaveState(): UserState = {
80       val userState0 = doFullMonthBilling(
81         userStateBootstrap,
82         billingMonthInfo,
83         defaultResourcesMap,
84         calculationReason,
85         storeFunc,
86         Some(clog)
87       )
88
89       // We always save the state when it is a full month billing
90       val userState1 = storeFunc.apply(
91         userState0.newWithChangeReason(
92           MonthlyBillingCalculation(calculationReason, billingMonthInfo))
93       )
94
95       clog.debug("Stored full %s %s", billingMonthInfo.toDebugString, userState1.toJsonString)
96       userState1
97     }
98
99     val userID = userStateBootstrap.userID
100     val userCreationMillis = userStateBootstrap.userCreationMillis
101     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
102     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
103     val billingMonthStopMillis = billingMonthInfo.monthStopMillis
104
105     if(billingMonthStopMillis < userCreationMillis) {
106       // If the user did not exist for this billing month, piece of cake
107       clog.debug("User did not exist before %s", userCreationDateCalc)
108
109       // TODO: The initial user state might have already been created.
110       //       First ask if it exists and compute only if not
111       val initialUserState0 = UserState.createInitialUserStateFromBootstrap(
112         userStateBootstrap,
113         TimeHelpers.nowMillis(),
114         InitialUserStateSetup(Some(calculationReason)) // we record the originating calculation reason
115       )
116
117       // We always save the initial state
118       val initialUserState1 = storeFunc.apply(initialUserState0)
119
120       clog.debug("Stored initial state = %s", initialUserState1.toJsonString)
121       clog.end()
122
123       return initialUserState1
124     }
125
126     // Ask DB cache for the latest known user state for this billing period
127     val latestUserStateOpt = userStateStoreForRead.findLatestUserStateForFullMonthBilling(
128       userID,
129       billingMonthInfo)
130
131     latestUserStateOpt match {
132       case None ⇒
133         // Not found, must compute
134         clog.debug("No user state found from cache, will have to (re)compute")
135         val result = computeFullMonthBillingAndSaveState
136         clog.end()
137         result
138
139       case Some(latestUserState) ⇒
140         // Found a "latest" user state but need to see if it is indeed the true and one latest.
141         // For this reason, we must count the events again.
142         val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
143         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
144           userID,
145           billingMonthStartMillis,
146           billingMonthStopMillis)
147
148         val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
149         counterDiff match {
150           // ZERO, we are OK!
151           case 0 ⇒
152             // NOTE: Keep the caller's calculation reason
153             val result = latestUserState.newWithChangeReason(calculationReason)
154             clog.end()
155             result
156
157           // We had more, so must recompute
158           case n if n > 0 ⇒
159             clog.debug(
160               "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
161             val result = computeFullMonthBillingAndSaveState
162             clog.end()
163             result
164
165           // We had less????
166           case n if n < 0 ⇒
167             val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
168             clog.warn(errMsg)
169             throw new AquariumInternalError(errMsg)
170         }
171     }
172   }
173
174   //+ Utility methods
175   protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
176     rcEvent.toDebugString
177   }
178   //- Utility methods
179
180   def processResourceEvent(
181       startingUserState: UserState,
182       userStateWorker: UserStateWorker,
183       currentResourceEvent: ResourceEventModel,
184       stateChangeReason: UserStateChangeReason,
185       billingMonthInfo: BillingMonthInfo,
186       walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
187       clogOpt: Option[ContextualLogger] = None
188   ): UserState = {
189
190     val clog = ContextualLogger.fromOther(clogOpt, logger, "processResourceEvent(%s)", currentResourceEvent.id)
191
192     var _workingUserState = startingUserState
193
194     val theResource = currentResourceEvent.safeResource
195     val theInstanceId = currentResourceEvent.safeInstanceId
196     val theValue = currentResourceEvent.value
197
198     val resourcesMap = userStateWorker.resourcesMap
199
200     val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
201     clog.begin(currentResourceEventDebugInfo)
202
203     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
204
205     // Ignore the event if it is not billable (but still record it in the "previous" stuff).
206     // But to make this decision, first we need the resource definition (and its cost policy).
207     val dslResourceOpt = resourcesMap.findResource(theResource)
208     dslResourceOpt match {
209       // We have a resource (and thus a cost policy)
210       case Some(dslResource) ⇒
211         val costPolicy = dslResource.costPolicy
212         clog.debug("Cost policy %s for %s", costPolicy, dslResource)
213         val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
214         if(!isBillable) {
215           // The resource event is not billable
216           clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
217         } else {
218           // The resource event is billable
219           // Find the previous event.
220           // This is (potentially) needed to calculate new credit amount and new resource instance amount
221           val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
222           clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
223
224           val havePreviousResourceEvent = previousResourceEventOpt.isDefined
225           val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
226           if(needPreviousResourceEvent && !havePreviousResourceEvent) {
227             // This must be the first resource event of its kind, ever.
228             // TODO: We should normally check the DB to verify the claim (?)
229             clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
230             userStateWorker.updateIgnored(currentResourceEvent)
231           } else {
232             val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
233             val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
234             val oldCredits = _workingUserState.totalCredits
235
236             // A. Compute new resource instance accumulating amount
237             val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
238
239             clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
240
241             // B. Compute new wallet entries
242             clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
243             val alltimeAgreements = _workingUserState.agreementHistory.agreementNamesByTimeslot
244
245             //              clog.debug("Computing full chargeslots")
246             val (referenceTimeslot, fullChargeslots) = timeslotComputations.computeFullChargeslots(
247               previousResourceEventOpt,
248               currentResourceEvent,
249               oldCredits,
250               oldAmount,
251               newAmount,
252               dslResource,
253               resourcesMap,
254               alltimeAgreements,
255               algorithmCompiler,
256               policyStore,
257               Some(clog)
258             )
259
260             // We have the chargeslots, let's associate them with the current event
261             if(fullChargeslots.length == 0) {
262               // At least one chargeslot is required.
263               throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
264             }
265             clog.debugSeq("fullChargeslots", fullChargeslots, 0)
266
267             // C. Compute new credit amount (based on the charge slots)
268             val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
269             val newCredits = oldCredits - newCreditsDiff
270
271             if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
272               val newWalletEntry = NewWalletEntry(
273                 userStateWorker.userID,
274                 newCreditsDiff,
275                 oldCredits,
276                 newCredits,
277                 TimeHelpers.nowMillis(),
278                 referenceTimeslot,
279                 billingMonthInfo.year,
280                 billingMonthInfo.month,
281                 if(havePreviousResourceEvent)
282                   List(currentResourceEvent, previousResourceEventOpt.get)
283                 else
284                   List(currentResourceEvent),
285                 fullChargeslots,
286                 dslResource,
287                 currentResourceEvent.isSynthetic
288               )
289               clog.debug("New %s", newWalletEntry)
290
291               walletEntriesBuffer += newWalletEntry
292             } else {
293               clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
294             }
295
296             _workingUserState = _workingUserState.copy(
297               totalCredits = newCredits,
298               stateChangeCounter = _workingUserState.stateChangeCounter + 1
299             )
300           }
301         }
302
303         // After processing, all events billable or not update the previous state
304         userStateWorker.updatePrevious(currentResourceEvent)
305
306         _workingUserState = _workingUserState.copy(
307           latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
308         )
309
310       // We do not have a resource (and thus, no cost policy)
311       case None ⇒
312         // Now, this is a matter of politics: what do we do if no policy was found?
313         clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
314     } // dslResourceOpt match
315
316     clog.end(currentResourceEventDebugInfo)
317
318     _workingUserState
319   }
320
321   def processResourceEvents(
322       resourceEvents: Traversable[ResourceEventModel],
323       startingUserState: UserState,
324       userStateWorker: UserStateWorker,
325       stateChangeReason: UserStateChangeReason,
326       billingMonthInfo: BillingMonthInfo,
327       walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
328       clogOpt: Option[ContextualLogger] = None
329   ): UserState = {
330
331     var _workingUserState = startingUserState
332
333     for(currentResourceEvent ← resourceEvents) {
334
335       _workingUserState = processResourceEvent(
336         _workingUserState,
337         userStateWorker,
338         currentResourceEvent,
339         stateChangeReason,
340         billingMonthInfo,
341         walletEntriesBuffer,
342         clogOpt
343       )
344     }
345
346     _workingUserState
347   }
348
349   def doFullMonthBilling(
350       userStateBootstrap: UserStateBootstrap,
351       billingMonthInfo: BillingMonthInfo,
352       defaultResourcesMap: DSLResourcesMap,
353       calculationReason: UserStateChangeReason,
354       storeFunc: UserState ⇒ UserState,
355       clogOpt: Option[ContextualLogger] = None
356   ): UserState = {
357
358     doMonthBillingUpTo(
359       billingMonthInfo,
360       billingMonthInfo.monthStopMillis,
361       userStateBootstrap,
362       defaultResourcesMap,
363       calculationReason,
364       storeFunc,
365       clogOpt
366     )
367   }
368
369   def doMonthBillingUpTo(
370       /**
371        * Which month to bill.
372        */
373       billingMonthInfo: BillingMonthInfo,
374       /**
375        * Bill from start of month up to (and including) this time.
376        */
377       billingEndTimeMillis: Long,
378       userStateBootstrap: UserStateBootstrap,
379       defaultResourcesMap: DSLResourcesMap,
380       calculationReason: UserStateChangeReason,
381       storeFunc: UserState ⇒ UserState,
382       clogOpt: Option[ContextualLogger] = None
383   ): UserState = {
384
385     val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
386     val userID = userStateBootstrap.userID
387
388     val clog = ContextualLogger.fromOther(
389       clogOpt,
390       logger,
391       "doMonthBillingUpTo(%s)", new MutableDateCalc(billingEndTimeMillis).toYYYYMMDDHHMMSSSSS)
392     clog.begin()
393
394     clog.debug("calculationReason = %s", calculationReason)
395
396     val clogSome = Some(clog)
397
398     val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
399       userStateBootstrap,
400       billingMonthInfo.previousMonth,
401       defaultResourcesMap,
402       calculationReason,
403       storeFunc,
404       clogSome
405     )
406
407     val startingUserState = previousBillingMonthUserState
408
409     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
410     // specified in the parameters.
411     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
412     var _workingUserState = startingUserState.newWithChangeReason(calculationReason)
413
414     val userStateWorker = UserStateWorker.fromUserState(_workingUserState, defaultResourcesMap)
415
416     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
417
418     // First, find and process the actual resource events from DB
419     val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
420       userID,
421       billingMonthInfo.monthStartMillis, // from start of month
422       billingEndTimeMillis               // to requested time
423     )
424
425     val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
426
427     _workingUserState = processResourceEvents(
428       allResourceEventsForMonth,
429       _workingUserState,
430       userStateWorker,
431       calculationReason,
432       billingMonthInfo,
433       newWalletEntries,
434       clogSome
435     )
436
437     if(isFullMonthBilling) {
438       // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
439       // ... in order to generate an implicit ON later (during the next billing cycle).
440       val (specialEvents, theirImplicitEnds) = userStateWorker.
441         findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthInfo.monthStopMillis)
442
443       if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
444         clog.debug("")
445         clog.debug("Process implicitly issued events")
446         clog.debugSeq("specialEvents", specialEvents, 0)
447         clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
448       }
449
450       // Now, the previous and implicitly started must be our base for the following computation, so we create an
451       // appropriate worker
452       val specialUserStateWorker = UserStateWorker(
453         userStateWorker.userID,
454         LatestResourceEventsWorker.fromList(specialEvents),
455         ImplicitlyIssuedResourceEventsWorker.Empty,
456         IgnoredFirstResourceEventsWorker.Empty,
457         userStateWorker.resourcesMap
458       )
459
460       _workingUserState = processResourceEvents(
461         theirImplicitEnds,
462         _workingUserState,
463         specialUserStateWorker,
464         calculationReason,
465         billingMonthInfo,
466         newWalletEntries,
467         clogSome
468       )
469     }
470
471     val lastUpdateTime = TimeHelpers.nowMillis()
472
473     _workingUserState = _workingUserState.copy(
474       isFullBillingMonthState = isFullMonthBilling,
475
476       theFullBillingMonth = if(isFullMonthBilling)
477         Some(billingMonthInfo)
478       else
479         _workingUserState.theFullBillingMonth,
480
481       implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
482
483       latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
484
485       stateChangeCounter = _workingUserState.stateChangeCounter + 1,
486
487       parentUserStateIDInStore = startingUserState.idInStore,
488
489       newWalletEntries = newWalletEntries.toList
490     )
491
492     clog.end()
493     _workingUserState
494   }
495 }