WIP Resource event handling
[aquarium] / src / main / scala / gr / grnet / aquarium / computation / UserStateComputations.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.computation
37
38 import scala.collection.mutable
39 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
40 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
41 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
42 import gr.grnet.aquarium.computation.state.parts._
43 import gr.grnet.aquarium.event.model.NewWalletEntry
44 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
45 import gr.grnet.aquarium.{Aquarium, AquariumInternalError}
46 import gr.grnet.aquarium.computation.reason.{MonthlyBillingCalculation, InitialUserStateSetup, UserStateChangeReason}
47 import gr.grnet.aquarium.computation.state.{UserStateWorker, UserStateBootstrap, UserState}
48
49 /**
50  *
51  * @author Christos KK Loverdos <loverdos@gmail.com>
52  */
53 final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
54
55   lazy val aquarium = _aquarium
56
57   lazy val storeProvider         = aquarium.storeProvider
58   lazy val timeslotComputations  = new TimeslotComputations {}
59   lazy val algorithmCompiler     = aquarium.algorithmCompiler
60   lazy val policyStore           = storeProvider.policyStore
61   lazy val userStateStoreForRead = storeProvider.userStateStore
62   lazy val resourceEventStore    = storeProvider.resourceEventStore
63
64   def findUserStateAtEndOfBillingMonth(
65       userStateBootstrap: UserStateBootstrap,
66       billingMonthInfo: BillingMonthInfo,
67       defaultResourcesMap: DSLResourcesMap,
68       calculationReason: UserStateChangeReason,
69       storeFunc: UserState ⇒ UserState,
70       clogOpt: Option[ContextualLogger] = None
71   ): UserState = {
72
73     val clog = ContextualLogger.fromOther(
74       clogOpt,
75       logger,
76       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
77     clog.begin()
78
79     def computeFullMonthBillingAndSaveState(): UserState = {
80       val userState0 = doFullMonthBilling(
81         userStateBootstrap,
82         billingMonthInfo,
83         defaultResourcesMap,
84         calculationReason,
85         storeFunc,
86         Some(clog)
87       )
88
89       // We always save the state when it is a full month billing
90       val userState1 = storeFunc.apply(
91         userState0.newWithChangeReason(
92           MonthlyBillingCalculation(calculationReason, billingMonthInfo))
93       )
94
95       clog.debug("Stored full %s %s", billingMonthInfo.toDebugString, userState1.toJsonString)
96       userState1
97     }
98
99     val userID = userStateBootstrap.userID
100     val userCreationMillis = userStateBootstrap.userCreationMillis
101     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
102     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
103     val billingMonthStopMillis = billingMonthInfo.monthStopMillis
104
105     if(billingMonthStopMillis < userCreationMillis) {
106       // If the user did not exist for this billing month, piece of cake
107       clog.debug("User did not exist before %s", userCreationDateCalc)
108
109       // TODO: The initial user state might have already been created.
110       //       First ask if it exists and compute only if not
111       val initialUserState0 = UserState.createInitialUserStateFromBootstrap(
112         userStateBootstrap,
113         TimeHelpers.nowMillis(),
114         InitialUserStateSetup(Some(calculationReason)) // we record the originating calculation reason
115       )
116
117       // We always save the initial state
118       val initialUserState1 = storeFunc.apply(initialUserState0)
119
120       clog.debug("Stored initial state = %s", initialUserState1.toJsonString)
121       clog.end()
122
123       return initialUserState1
124     }
125
126     // Ask DB cache for the latest known user state for this billing period
127     val latestUserStateOpt = userStateStoreForRead.findLatestUserStateForFullMonthBilling(
128       userID,
129       billingMonthInfo)
130
131     latestUserStateOpt match {
132       case None ⇒
133         // Not found, must compute
134         clog.debug("No user state found from cache, will have to (re)compute")
135         val result = computeFullMonthBillingAndSaveState
136         clog.end()
137         result
138
139       case Some(latestUserState) ⇒
140         // Found a "latest" user state but need to see if it is indeed the true and one latest.
141         // For this reason, we must count the events again.
142         val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
143         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
144           userID,
145           billingMonthStartMillis,
146           billingMonthStopMillis)
147
148         val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
149         counterDiff match {
150           // ZERO, we are OK!
151           case 0 ⇒
152             // NOTE: Keep the caller's calculation reason
153             latestUserState.newWithChangeReason(calculationReason)
154
155           // We had more, so must recompute
156           case n if n > 0 ⇒
157             clog.debug(
158               "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
159             val result = computeFullMonthBillingAndSaveState
160             clog.end()
161             result
162
163           // We had less????
164           case n if n < 0 ⇒
165             val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
166             clog.warn(errMsg)
167             throw new AquariumInternalError(errMsg)
168         }
169     }
170   }
171
172   //+ Utility methods
173   protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
174     rcEvent.toDebugString
175   }
176   //- Utility methods
177
178   def processResourceEvent(
179       startingUserState: UserState,
180       userStateWorker: UserStateWorker,
181       currentResourceEvent: ResourceEventModel,
182       stateChangeReason: UserStateChangeReason,
183       billingMonthInfo: BillingMonthInfo,
184       walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
185       clogOpt: Option[ContextualLogger] = None
186   ): UserState = {
187
188     val clog = ContextualLogger.fromOther(clogOpt, logger, "processResourceEvent(%s)", currentResourceEvent.id)
189
190     var _workingUserState = startingUserState
191
192     val theResource = currentResourceEvent.safeResource
193     val theInstanceId = currentResourceEvent.safeInstanceId
194     val theValue = currentResourceEvent.value
195
196     val resourcesMap = userStateWorker.resourcesMap
197
198     val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
199     clog.begin(currentResourceEventDebugInfo)
200
201     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
202
203     // Ignore the event if it is not billable (but still record it in the "previous" stuff).
204     // But to make this decision, first we need the resource definition (and its cost policy).
205     val dslResourceOpt = resourcesMap.findResource(theResource)
206     dslResourceOpt match {
207       // We have a resource (and thus a cost policy)
208       case Some(dslResource) ⇒
209         val costPolicy = dslResource.costPolicy
210         clog.debug("Cost policy %s for %s", costPolicy, dslResource)
211         val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
212         if(!isBillable) {
213           // The resource event is not billable
214           clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
215         } else {
216           // The resource event is billable
217           // Find the previous event.
218           // This is (potentially) needed to calculate new credit amount and new resource instance amount
219           val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
220           clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
221
222           val havePreviousResourceEvent = previousResourceEventOpt.isDefined
223           val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
224           if(needPreviousResourceEvent && !havePreviousResourceEvent) {
225             // This must be the first resource event of its kind, ever.
226             // TODO: We should normally check the DB to verify the claim (?)
227             clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
228             userStateWorker.updateIgnored(currentResourceEvent)
229           } else {
230             val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
231             val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
232             val oldCredits = _workingUserState.totalCredits
233
234             // A. Compute new resource instance accumulating amount
235             val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
236
237             clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
238
239             // B. Compute new wallet entries
240             clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
241             val alltimeAgreements = _workingUserState.agreementHistory.agreementNamesByTimeslot
242
243             //              clog.debug("Computing full chargeslots")
244             val (referenceTimeslot, fullChargeslots) = timeslotComputations.computeFullChargeslots(
245               previousResourceEventOpt,
246               currentResourceEvent,
247               oldCredits,
248               oldAmount,
249               newAmount,
250               dslResource,
251               resourcesMap,
252               alltimeAgreements,
253               algorithmCompiler,
254               policyStore,
255               Some(clog)
256             )
257
258             // We have the chargeslots, let's associate them with the current event
259             if(fullChargeslots.length == 0) {
260               // At least one chargeslot is required.
261               throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
262             }
263             clog.debugSeq("fullChargeslots", fullChargeslots, 0)
264
265             // C. Compute new credit amount (based on the charge slots)
266             val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
267             val newCredits = oldCredits - newCreditsDiff
268
269             if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
270               val newWalletEntry = NewWalletEntry(
271                 userStateWorker.userID,
272                 newCreditsDiff,
273                 oldCredits,
274                 newCredits,
275                 TimeHelpers.nowMillis(),
276                 referenceTimeslot,
277                 billingMonthInfo.year,
278                 billingMonthInfo.month,
279                 if(havePreviousResourceEvent)
280                   List(currentResourceEvent, previousResourceEventOpt.get)
281                 else
282                   List(currentResourceEvent),
283                 fullChargeslots,
284                 dslResource,
285                 currentResourceEvent.isSynthetic
286               )
287               clog.debug("New %s", newWalletEntry)
288
289               walletEntriesBuffer += newWalletEntry
290             } else {
291               clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
292             }
293
294             _workingUserState = _workingUserState.copy(
295               totalCredits = newCredits,
296               stateChangeCounter = _workingUserState.stateChangeCounter + 1
297             )
298           }
299         }
300
301         // After processing, all events billable or not update the previous state
302         userStateWorker.updatePrevious(currentResourceEvent)
303
304         _workingUserState = _workingUserState.copy(
305           latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
306         )
307
308       // We do not have a resource (and thus, no cost policy)
309       case None ⇒
310         // Now, this is a matter of politics: what do we do if no policy was found?
311         clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
312     } // dslResourceOpt match
313
314     clog.end(currentResourceEventDebugInfo)
315
316     _workingUserState
317   }
318
319   def processResourceEvents(
320       resourceEvents: Traversable[ResourceEventModel],
321       startingUserState: UserState,
322       userStateWorker: UserStateWorker,
323       stateChangeReason: UserStateChangeReason,
324       billingMonthInfo: BillingMonthInfo,
325       walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
326       clogOpt: Option[ContextualLogger] = None
327   ): UserState = {
328
329     var _workingUserState = startingUserState
330
331     for(currentResourceEvent ← resourceEvents) {
332
333       _workingUserState = processResourceEvent(
334         _workingUserState,
335         userStateWorker,
336         currentResourceEvent,
337         stateChangeReason,
338         billingMonthInfo,
339         walletEntriesBuffer,
340         clogOpt
341       )
342     }
343
344     _workingUserState
345   }
346
347   def doFullMonthBilling(
348       userStateBootstrap: UserStateBootstrap,
349       billingMonthInfo: BillingMonthInfo,
350       defaultResourcesMap: DSLResourcesMap,
351       calculationReason: UserStateChangeReason,
352       storeFunc: UserState ⇒ UserState,
353       clogOpt: Option[ContextualLogger] = None
354   ): UserState = {
355
356     doMonthBillingUpTo(
357       billingMonthInfo,
358       billingMonthInfo.monthStopMillis,
359       userStateBootstrap,
360       defaultResourcesMap,
361       calculationReason,
362       storeFunc,
363       clogOpt
364     )
365   }
366
367   def doMonthBillingUpTo(
368       /**
369        * Which month to bill.
370        */
371       billingMonthInfo: BillingMonthInfo,
372       /**
373        * Bill from start of month up to (and including) this time.
374        */
375       billingEndTimeMillis: Long,
376       userStateBootstrap: UserStateBootstrap,
377       defaultResourcesMap: DSLResourcesMap,
378       calculationReason: UserStateChangeReason,
379       storeFunc: UserState ⇒ UserState,
380       clogOpt: Option[ContextualLogger] = None
381   ): UserState = {
382
383     val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
384     val userID = userStateBootstrap.userID
385
386     val clog = ContextualLogger.fromOther(
387       clogOpt,
388       logger,
389       "doMonthBillingUpTo(%s)", new MutableDateCalc(billingEndTimeMillis).toYYYYMMDDHHMMSSSSS)
390     clog.begin()
391
392     clog.debug("calculationReason = %s", calculationReason)
393
394     val clogSome = Some(clog)
395
396     val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
397       userStateBootstrap,
398       billingMonthInfo.previousMonth,
399       defaultResourcesMap,
400       calculationReason,
401       storeFunc,
402       clogSome
403     )
404
405     val startingUserState = previousBillingMonthUserState
406
407     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
408     // specified in the parameters.
409     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
410     var _workingUserState = startingUserState.newWithChangeReason(calculationReason)
411
412     val userStateWorker = UserStateWorker.fromUserState(_workingUserState, defaultResourcesMap)
413
414     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
415
416     // First, find and process the actual resource events from DB
417     val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
418       userID,
419       billingMonthInfo.monthStartMillis, // from start of month
420       billingEndTimeMillis               // to requested time
421     )
422
423     val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
424
425     _workingUserState = processResourceEvents(
426       allResourceEventsForMonth,
427       _workingUserState,
428       userStateWorker,
429       calculationReason,
430       billingMonthInfo,
431       newWalletEntries,
432       clogSome
433     )
434
435     if(isFullMonthBilling) {
436       // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
437       // ... in order to generate an implicit ON later (during the next billing cycle).
438       val (specialEvents, theirImplicitEnds) = userStateWorker.
439         findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthInfo.monthStopMillis)
440
441       if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
442         clog.debug("")
443         clog.debug("Process implicitly issued events")
444         clog.debugSeq("specialEvents", specialEvents, 0)
445         clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
446       }
447
448       // Now, the previous and implicitly started must be our base for the following computation, so we create an
449       // appropriate worker
450       val specialUserStateWorker = UserStateWorker(
451         userStateWorker.userID,
452         LatestResourceEventsWorker.fromList(specialEvents),
453         ImplicitlyIssuedResourceEventsWorker.Empty,
454         IgnoredFirstResourceEventsWorker.Empty,
455         userStateWorker.resourcesMap
456       )
457
458       _workingUserState = processResourceEvents(
459         theirImplicitEnds,
460         _workingUserState,
461         specialUserStateWorker,
462         calculationReason,
463         billingMonthInfo,
464         newWalletEntries,
465         clogSome
466       )
467     }
468
469     val lastUpdateTime = TimeHelpers.nowMillis()
470
471     _workingUserState = _workingUserState.copy(
472       isFullBillingMonthState = isFullMonthBilling,
473
474       theFullBillingMonth = if(isFullMonthBilling)
475         Some(billingMonthInfo)
476       else
477         _workingUserState.theFullBillingMonth,
478
479       implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
480
481       latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
482
483       stateChangeCounter = _workingUserState.stateChangeCounter + 1,
484
485       parentUserStateIDInStore = startingUserState.idInStore,
486
487       newWalletEntries = newWalletEntries.toList
488     )
489
490     clog.end()
491     _workingUserState
492   }
493 }