WIP Resource event handling
[aquarium] / src / main / scala / gr / grnet / aquarium / computation / UserStateComputations.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.computation
37
38 import scala.collection.mutable
39 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
40 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
41 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
42 import gr.grnet.aquarium.logic.accounting.Accounting
43 import gr.grnet.aquarium.computation.data._
44 import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, UserStateChangeReason}
45 import gr.grnet.aquarium.event.model.NewWalletEntry
46 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
47 import gr.grnet.aquarium.{Aquarium, AquariumInternalError, AquariumException}
48
49 /**
50  *
51  * @author Christos KK Loverdos <loverdos@gmail.com>
52  */
53 class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
54
55   protected lazy val aquarium           = _aquarium()
56   protected lazy val storeProvider      = aquarium.storeProvider
57   protected lazy val accounting         = new Accounting {}
58   protected lazy val algorithmCompiler  = aquarium.algorithmCompiler
59   protected lazy val policyStore        = storeProvider.policyStore
60   protected lazy val userStateStore     = storeProvider.userStateStore
61   protected lazy val resourceEventStore = storeProvider.resourceEventStore
62
63   def findUserStateAtEndOfBillingMonth(userId: String,
64                                        billingMonthInfo: BillingMonthInfo,
65                                        currentUserState: UserState,
66                                        defaultResourcesMap: DSLResourcesMap,
67                                        calculationReason: UserStateChangeReason,
68                                        clogOpt: Option[ContextualLogger] = None): UserState = {
69
70     val clog = ContextualLogger.fromOther(
71       clogOpt,
72       logger,
73       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
74     clog.begin()
75
76     def doCompute: UserState = {
77       doFullMonthlyBilling(
78         userId,
79         billingMonthInfo,
80         currentUserState,
81         defaultResourcesMap,
82         calculationReason,
83         Some(clog))
84     }
85
86     val userCreationMillis = currentUserState.userCreationMillis
87     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
88     val billingMonthStartMillis = billingMonthInfo.startMillis
89     val billingMonthStopMillis = billingMonthInfo.stopMillis
90
91     if(billingMonthStopMillis < userCreationMillis) {
92       // If the user did not exist for this billing month, piece of cake
93       clog.debug("User did not exist before %s", userCreationDateCalc)
94
95       // NOTE: Reason here will be: InitialUserStateSetup$
96       val initialUserState0 = UserState.createInitialUserStateFrom(currentUserState)
97       val initialUserState1 = userStateStore.insertUserState(initialUserState0)
98
99       clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
100       clog.end()
101
102       initialUserState1
103     } else {
104       // Ask DB cache for the latest known user state for this billing period
105       val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
106         userId,
107         billingMonthInfo.year,
108         billingMonthInfo.month)
109
110       latestUserStateOpt match {
111         case None ⇒
112           // Not found, must compute
113           clog.debug("No user state found from cache, will have to (re)compute")
114           val result = doCompute
115           clog.end()
116           result
117
118         case Some(latestUserState) ⇒
119           // Found a "latest" user state but need to see if it is indeed the true and one latest.
120           // For this reason, we must count the events again.
121           val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
122           val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
123             userId,
124             billingMonthStartMillis,
125             billingMonthStopMillis)
126
127           val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
128           counterDiff match {
129             // ZERO, we are OK!
130             case 0 ⇒
131               // NOTE: Keep the caller's calculation reason
132               latestUserState.copyForChangeReason(calculationReason)
133
134             // We had more, so must recompute
135             case n if n > 0 ⇒
136               clog.debug(
137                 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
138               val result = doCompute
139               clog.end()
140               result
141
142             // We had less????
143             case n if n < 0 ⇒
144               val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
145               clog.warn(errMsg)
146               throw new AquariumException(errMsg)
147           }
148       }
149     }
150   }
151
152   //+ Utility methods
153   def rcDebugInfo(rcEvent: ResourceEventModel) = {
154     rcEvent.toDebugString(false)
155   }
156
157   //- Utility methods
158
159   def processResourceEvent(startingUserState: UserState,
160                            userStateWorker: UserStateWorker,
161                            currentResourceEvent: ResourceEventModel,
162                            stateChangeReason: UserStateChangeReason,
163                            billingMonthInfo: BillingMonthInfo,
164                            walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
165                            clogOpt: Option[ContextualLogger] = None): UserState = {
166
167     val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
168
169     var _workingUserState = startingUserState
170
171     val theResource = currentResourceEvent.safeResource
172     val theInstanceId = currentResourceEvent.safeInstanceId
173     val theValue = currentResourceEvent.value
174
175     val resourcesMap = userStateWorker.resourcesMap
176
177     val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
178     clog.begin(currentResourceEventDebugInfo)
179
180     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
181
182     // Ignore the event if it is not billable (but still record it in the "previous" stuff).
183     // But to make this decision, first we need the resource definition (and its cost policy).
184     val dslResourceOpt = resourcesMap.findResource(theResource)
185     dslResourceOpt match {
186       // We have a resource (and thus a cost policy)
187       case Some(dslResource) ⇒
188         val costPolicy = dslResource.costPolicy
189         clog.debug("Cost policy %s for %s", costPolicy, dslResource)
190         val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
191         if(!isBillable) {
192           // The resource event is not billable
193           clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
194         } else {
195           // The resource event is billable
196           // Find the previous event.
197           // This is (potentially) needed to calculate new credit amount and new resource instance amount
198           val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
199           clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
200
201           val havePreviousResourceEvent = previousResourceEventOpt.isDefined
202           val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
203           if(needPreviousResourceEvent && !havePreviousResourceEvent) {
204             // This must be the first resource event of its kind, ever.
205             // TODO: We should normally check the DB to verify the claim (?)
206             clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
207             userStateWorker.updateIgnored(currentResourceEvent)
208           } else {
209             val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
210             val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
211             val oldCredits = _workingUserState.totalCredits
212
213             // A. Compute new resource instance accumulating amount
214             val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
215
216             clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
217
218             // B. Compute new wallet entries
219             clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
220             val alltimeAgreements = _workingUserState.agreementHistory.agreementNamesByTimeslot
221
222             //              clog.debug("Computing full chargeslots")
223             val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
224               previousResourceEventOpt,
225               currentResourceEvent,
226               oldCredits,
227               oldAmount,
228               newAmount,
229               dslResource,
230               resourcesMap,
231               alltimeAgreements,
232               algorithmCompiler,
233               policyStore,
234               Some(clog)
235             )
236
237             // We have the chargeslots, let's associate them with the current event
238             if(fullChargeslots.length == 0) {
239               // At least one chargeslot is required.
240               throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
241             }
242             clog.debugSeq("fullChargeslots", fullChargeslots, 0)
243
244             // C. Compute new credit amount (based on the charge slots)
245             val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
246             val newCredits = oldCredits - newCreditsDiff
247
248             if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
249               val newWalletEntry = NewWalletEntry(
250                 userStateWorker.userID,
251                 newCreditsDiff,
252                 oldCredits,
253                 newCredits,
254                 TimeHelpers.nowMillis(),
255                 referenceTimeslot,
256                 billingMonthInfo.year,
257                 billingMonthInfo.month,
258                 if(havePreviousResourceEvent)
259                   List(currentResourceEvent, previousResourceEventOpt.get)
260                 else
261                   List(currentResourceEvent),
262                 fullChargeslots,
263                 dslResource,
264                 currentResourceEvent.isSynthetic
265               )
266               clog.debug("New %s", newWalletEntry)
267
268               walletEntriesBuffer += newWalletEntry
269             } else {
270               clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
271             }
272
273             _workingUserState = _workingUserState.copy(
274               totalCredits = newCredits,
275               stateChangeCounter = _workingUserState.stateChangeCounter + 1
276             )
277           }
278         }
279
280         // After processing, all events billable or not update the previous state
281         userStateWorker.updatePrevious(currentResourceEvent)
282
283         _workingUserState = _workingUserState.copy(
284           latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
285         )
286
287       // We do not have a resource (and thus, no cost policy)
288       case None ⇒
289         // Now, this is a matter of politics: what do we do if no policy was found?
290         clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
291     } // dslResourceOpt match
292
293     clog.end(currentResourceEventDebugInfo)
294
295     _workingUserState
296   }
297
298   def processResourceEvents(resourceEvents: Traversable[ResourceEventModel],
299                             startingUserState: UserState,
300                             userStateWorker: UserStateWorker,
301                             stateChangeReason: UserStateChangeReason,
302                             billingMonthInfo: BillingMonthInfo,
303                             walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
304                             clogOpt: Option[ContextualLogger] = None): UserState = {
305
306     var _workingUserState = startingUserState
307
308     for(currentResourceEvent ← resourceEvents) {
309
310       _workingUserState = processResourceEvent(
311         _workingUserState,
312         userStateWorker,
313         currentResourceEvent,
314         stateChangeReason,
315         billingMonthInfo,
316         walletEntriesBuffer,
317         clogOpt
318       )
319     }
320
321     _workingUserState
322   }
323
324
325   def doFullMonthlyBilling(userId: String,
326                            billingMonthInfo: BillingMonthInfo,
327                            currentUserState: UserState,
328                            defaultResourcesMap: DSLResourcesMap,
329                            calculationReason: UserStateChangeReason = NoSpecificChangeReason,
330                            clogOpt: Option[ContextualLogger] = None): UserState = {
331
332
333     val clog = ContextualLogger.fromOther(
334       clogOpt,
335       logger,
336       "doFullMonthlyBilling(%s)", billingMonthInfo)
337     clog.begin()
338
339     val clogSome = Some(clog)
340
341     val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
342       userId,
343       billingMonthInfo.previousMonth,
344       currentUserState,
345       defaultResourcesMap,
346       calculationReason.forPreviousBillingMonth,
347       clogSome
348     )
349
350     val startingUserState = previousBillingMonthUserState
351
352
353     val billingMonthStartMillis = billingMonthInfo.startMillis
354     val billingMonthEndMillis = billingMonthInfo.stopMillis
355
356     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
357     // specified in the parameters.
358     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
359     var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
360
361     val userStateWorker = UserStateWorker.fromUserState(_workingUserState, defaultResourcesMap)
362
363     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
364
365     // First, find and process the actual resource events from DB
366     val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
367       userId,
368       billingMonthStartMillis,
369       billingMonthEndMillis)
370
371     val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
372
373     _workingUserState = processResourceEvents(
374       allResourceEventsForMonth,
375       _workingUserState,
376       userStateWorker,
377       calculationReason,
378       billingMonthInfo,
379       newWalletEntries,
380       clogSome
381     )
382
383     // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
384     // ... in order to generate an implicit ON later
385     val (specialEvents, theirImplicitEnds) = userStateWorker.
386       findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
387     if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
388       clog.debug("")
389       clog.debug("Process implicitly issued events")
390       clog.debugSeq("specialEvents", specialEvents, 0)
391       clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
392     }
393
394     // Now, the previous and implicitly started must be our base for the following computation, so we create an
395     // appropriate worker
396     val specialUserStateWorker = UserStateWorker(
397       userStateWorker.userID,
398       LatestResourceEventsWorker.fromList(specialEvents),
399       ImplicitlyIssuedResourceEventsWorker.Empty,
400       IgnoredFirstResourceEventsWorker.Empty,
401       userStateWorker.resourcesMap
402     )
403
404     _workingUserState = processResourceEvents(
405       theirImplicitEnds,
406       _workingUserState,
407       specialUserStateWorker,
408       calculationReason,
409       billingMonthInfo,
410       newWalletEntries,
411       clogSome
412     )
413
414     val lastUpdateTime = TimeHelpers.nowMillis()
415
416     _workingUserState = _workingUserState.copy(
417       implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
418       latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
419       stateChangeCounter = _workingUserState.stateChangeCounter + 1,
420       parentUserStateId = startingUserState.idOpt,
421       newWalletEntries = newWalletEntries.toList
422     )
423
424     clog.debug("calculationReason = %s", calculationReason)
425
426     if(calculationReason.shouldStoreUserState) {
427       val storedUserState = userStateStore.insertUserState(_workingUserState)
428       clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState)
429       _workingUserState = storedUserState
430     }
431
432     clog.debug("RETURN %s", _workingUserState)
433     clog.end()
434     _workingUserState
435   }
436 }