Being devops-friendly until a user creation event arrives
[aquarium] / src / main / scala / gr / grnet / aquarium / computation / UserStateComputations.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.computation
37
38 import scala.collection.mutable
39 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
40 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
41 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
42 import gr.grnet.aquarium.logic.accounting.Accounting
43 import gr.grnet.aquarium.computation.data._
44 import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, UserStateChangeReason}
45 import gr.grnet.aquarium.event.model.NewWalletEntry
46 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
47 import gr.grnet.aquarium.{Aquarium, AquariumInternalError, AquariumException}
48
49 /**
50  *
51  * @author Christos KK Loverdos <loverdos@gmail.com>
52  */
53 class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
54
55   protected lazy val aquarium           = _aquarium()
56   protected lazy val storeProvider      = aquarium.storeProvider
57   protected lazy val accounting         = new Accounting {}
58   protected lazy val algorithmCompiler  = aquarium.algorithmCompiler
59   protected lazy val policyStore        = storeProvider.policyStore
60   protected lazy val userStateStore     = storeProvider.userStateStore
61   protected lazy val resourceEventStore = storeProvider.resourceEventStore
62
63   def findUserStateAtEndOfBillingMonth(userStateBootstrap: UserStateBootstrappingData,
64                                        billingMonthInfo: BillingMonthInfo,
65                                        defaultResourcesMap: DSLResourcesMap,
66                                        calculationReason: UserStateChangeReason,
67                                        clogOpt: Option[ContextualLogger] = None): UserState = {
68
69     val clog = ContextualLogger.fromOther(
70       clogOpt,
71       logger,
72       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
73     clog.begin()
74
75     def doCompute: UserState = {
76       doFullMonthlyBilling(
77         userStateBootstrap,
78         billingMonthInfo,
79         defaultResourcesMap,
80         calculationReason,
81         Some(clog))
82     }
83
84     val userID = userStateBootstrap.userID
85     val userCreationMillis = userStateBootstrap.userCreationMillis
86     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
87     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
88     val billingMonthStopMillis = billingMonthInfo.monthStopMillis
89
90     if(billingMonthStopMillis < userCreationMillis) {
91       // If the user did not exist for this billing month, piece of cake
92       clog.debug("User did not exist before %s", userCreationDateCalc)
93
94       // NOTE: Reason here will be: InitialUserStateSetup$
95       val initialUserState0 = UserState.createInitialUserState(userStateBootstrap)
96       val initialUserState1 = userStateStore.insertUserState(initialUserState0)
97
98       clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
99       clog.end()
100
101       initialUserState1
102     } else {
103       // Ask DB cache for the latest known user state for this billing period
104       val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
105         userID,
106         billingMonthInfo.year,
107         billingMonthInfo.month)
108
109       latestUserStateOpt match {
110         case None ⇒
111           // Not found, must compute
112           clog.debug("No user state found from cache, will have to (re)compute")
113           val result = doCompute
114           clog.end()
115           result
116
117         case Some(latestUserState) ⇒
118           // Found a "latest" user state but need to see if it is indeed the true and one latest.
119           // For this reason, we must count the events again.
120           val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
121           val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
122             userID,
123             billingMonthStartMillis,
124             billingMonthStopMillis)
125
126           val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
127           counterDiff match {
128             // ZERO, we are OK!
129             case 0 ⇒
130               // NOTE: Keep the caller's calculation reason
131               latestUserState.copyForChangeReason(calculationReason)
132
133             // We had more, so must recompute
134             case n if n > 0 ⇒
135               clog.debug(
136                 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
137               val result = doCompute
138               clog.end()
139               result
140
141             // We had less????
142             case n if n < 0 ⇒
143               val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
144               clog.warn(errMsg)
145               throw new AquariumException(errMsg)
146           }
147       }
148     }
149   }
150
151   //+ Utility methods
152   protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
153     rcEvent.toDebugString(false)
154   }
155   //- Utility methods
156
157   def processResourceEvent(startingUserState: UserState,
158                            userStateWorker: UserStateWorker,
159                            currentResourceEvent: ResourceEventModel,
160                            stateChangeReason: UserStateChangeReason,
161                            billingMonthInfo: BillingMonthInfo,
162                            walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
163                            clogOpt: Option[ContextualLogger] = None): UserState = {
164
165     val clog = ContextualLogger.fromOther(clogOpt, logger, "processResourceEvent(%s)", currentResourceEvent.id)
166
167     var _workingUserState = startingUserState
168
169     val theResource = currentResourceEvent.safeResource
170     val theInstanceId = currentResourceEvent.safeInstanceId
171     val theValue = currentResourceEvent.value
172
173     val resourcesMap = userStateWorker.resourcesMap
174
175     val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
176     clog.begin(currentResourceEventDebugInfo)
177
178     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
179
180     // Ignore the event if it is not billable (but still record it in the "previous" stuff).
181     // But to make this decision, first we need the resource definition (and its cost policy).
182     val dslResourceOpt = resourcesMap.findResource(theResource)
183     dslResourceOpt match {
184       // We have a resource (and thus a cost policy)
185       case Some(dslResource) ⇒
186         val costPolicy = dslResource.costPolicy
187         clog.debug("Cost policy %s for %s", costPolicy, dslResource)
188         val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
189         if(!isBillable) {
190           // The resource event is not billable
191           clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
192         } else {
193           // The resource event is billable
194           // Find the previous event.
195           // This is (potentially) needed to calculate new credit amount and new resource instance amount
196           val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
197           clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
198
199           val havePreviousResourceEvent = previousResourceEventOpt.isDefined
200           val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
201           if(needPreviousResourceEvent && !havePreviousResourceEvent) {
202             // This must be the first resource event of its kind, ever.
203             // TODO: We should normally check the DB to verify the claim (?)
204             clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
205             userStateWorker.updateIgnored(currentResourceEvent)
206           } else {
207             val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
208             val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
209             val oldCredits = _workingUserState.totalCredits
210
211             // A. Compute new resource instance accumulating amount
212             val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
213
214             clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
215
216             // B. Compute new wallet entries
217             clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
218             val alltimeAgreements = _workingUserState.agreementHistory.agreementNamesByTimeslot
219
220             //              clog.debug("Computing full chargeslots")
221             val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
222               previousResourceEventOpt,
223               currentResourceEvent,
224               oldCredits,
225               oldAmount,
226               newAmount,
227               dslResource,
228               resourcesMap,
229               alltimeAgreements,
230               algorithmCompiler,
231               policyStore,
232               Some(clog)
233             )
234
235             // We have the chargeslots, let's associate them with the current event
236             if(fullChargeslots.length == 0) {
237               // At least one chargeslot is required.
238               throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
239             }
240             clog.debugSeq("fullChargeslots", fullChargeslots, 0)
241
242             // C. Compute new credit amount (based on the charge slots)
243             val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
244             val newCredits = oldCredits - newCreditsDiff
245
246             if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
247               val newWalletEntry = NewWalletEntry(
248                 userStateWorker.userID,
249                 newCreditsDiff,
250                 oldCredits,
251                 newCredits,
252                 TimeHelpers.nowMillis(),
253                 referenceTimeslot,
254                 billingMonthInfo.year,
255                 billingMonthInfo.month,
256                 if(havePreviousResourceEvent)
257                   List(currentResourceEvent, previousResourceEventOpt.get)
258                 else
259                   List(currentResourceEvent),
260                 fullChargeslots,
261                 dslResource,
262                 currentResourceEvent.isSynthetic
263               )
264               clog.debug("New %s", newWalletEntry)
265
266               walletEntriesBuffer += newWalletEntry
267             } else {
268               clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
269             }
270
271             _workingUserState = _workingUserState.copy(
272               totalCredits = newCredits,
273               stateChangeCounter = _workingUserState.stateChangeCounter + 1
274             )
275           }
276         }
277
278         // After processing, all events billable or not update the previous state
279         userStateWorker.updatePrevious(currentResourceEvent)
280
281         _workingUserState = _workingUserState.copy(
282           latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
283         )
284
285       // We do not have a resource (and thus, no cost policy)
286       case None ⇒
287         // Now, this is a matter of politics: what do we do if no policy was found?
288         clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
289     } // dslResourceOpt match
290
291     clog.end(currentResourceEventDebugInfo)
292
293     _workingUserState
294   }
295
296   def processResourceEvents(resourceEvents: Traversable[ResourceEventModel],
297                             startingUserState: UserState,
298                             userStateWorker: UserStateWorker,
299                             stateChangeReason: UserStateChangeReason,
300                             billingMonthInfo: BillingMonthInfo,
301                             walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
302                             clogOpt: Option[ContextualLogger] = None): UserState = {
303
304     var _workingUserState = startingUserState
305
306     for(currentResourceEvent ← resourceEvents) {
307
308       _workingUserState = processResourceEvent(
309         _workingUserState,
310         userStateWorker,
311         currentResourceEvent,
312         stateChangeReason,
313         billingMonthInfo,
314         walletEntriesBuffer,
315         clogOpt
316       )
317     }
318
319     _workingUserState
320   }
321
322
323   def doFullMonthlyBilling(userStateBootstrap: UserStateBootstrappingData,
324                            billingMonthInfo: BillingMonthInfo,
325                            defaultResourcesMap: DSLResourcesMap,
326                            calculationReason: UserStateChangeReason,
327                            clogOpt: Option[ContextualLogger] = None): UserState = {
328
329     val userID = userStateBootstrap.userID
330
331     val clog = ContextualLogger.fromOther(
332       clogOpt,
333       logger,
334       "doFullMonthlyBilling(%s)", billingMonthInfo.toShortDebugString)
335     clog.begin()
336
337     val clogSome = Some(clog)
338
339     val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
340       userStateBootstrap,
341       billingMonthInfo.previousMonth,
342       defaultResourcesMap,
343       calculationReason.forBillingMonthInfo(billingMonthInfo.previousMonth),
344       clogSome
345     )
346
347     val startingUserState = previousBillingMonthUserState
348
349
350     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
351     val billingMonthEndMillis = billingMonthInfo.monthStopMillis
352
353     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
354     // specified in the parameters.
355     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
356     var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
357
358     val userStateWorker = UserStateWorker.fromUserState(_workingUserState, defaultResourcesMap)
359
360     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
361
362     // First, find and process the actual resource events from DB
363     val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
364       userID,
365       billingMonthStartMillis,
366       billingMonthEndMillis)
367
368     val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
369
370     _workingUserState = processResourceEvents(
371       allResourceEventsForMonth,
372       _workingUserState,
373       userStateWorker,
374       calculationReason,
375       billingMonthInfo,
376       newWalletEntries,
377       clogSome
378     )
379
380     // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
381     // ... in order to generate an implicit ON later
382     val (specialEvents, theirImplicitEnds) = userStateWorker.
383       findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
384     if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
385       clog.debug("")
386       clog.debug("Process implicitly issued events")
387       clog.debugSeq("specialEvents", specialEvents, 0)
388       clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
389     }
390
391     // Now, the previous and implicitly started must be our base for the following computation, so we create an
392     // appropriate worker
393     val specialUserStateWorker = UserStateWorker(
394       userStateWorker.userID,
395       LatestResourceEventsWorker.fromList(specialEvents),
396       ImplicitlyIssuedResourceEventsWorker.Empty,
397       IgnoredFirstResourceEventsWorker.Empty,
398       userStateWorker.resourcesMap
399     )
400
401     _workingUserState = processResourceEvents(
402       theirImplicitEnds,
403       _workingUserState,
404       specialUserStateWorker,
405       calculationReason,
406       billingMonthInfo,
407       newWalletEntries,
408       clogSome
409     )
410
411     val lastUpdateTime = TimeHelpers.nowMillis()
412
413     _workingUserState = _workingUserState.copy(
414       implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
415       latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
416       stateChangeCounter = _workingUserState.stateChangeCounter + 1,
417       parentUserStateId = startingUserState.idOpt,
418       newWalletEntries = newWalletEntries.toList
419     )
420
421     clog.debug("calculationReason = %s", calculationReason)
422
423     if(calculationReason.shouldStoreUserState) {
424       val storedUserState = userStateStore.insertUserState(_workingUserState)
425       clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState)
426       _workingUserState = storedUserState
427     }
428
429     clog.debug("RETURN %s", _workingUserState)
430     clog.end()
431     _workingUserState
432   }
433 }