Preparing the move to master
[aquarium] / src / main / scala / gr / grnet / aquarium / user / UserStateComputations.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.user
37
38
39 import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
40 import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, ResourceEventStore}
41 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
42 import gr.grnet.aquarium.logic.accounting.Accounting
43 import gr.grnet.aquarium.logic.accounting.algorithm.SimpleCostPolicyAlgorithmCompiler
44 import gr.grnet.aquarium.logic.events.{NewWalletEntry, ResourceEvent}
45 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
46 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLCostPolicy, DSLResourcesMap, DSLPolicy}
47
48 /**
49  *
50  * @author Christos KK Loverdos <loverdos@gmail.com>
51  */
52 class UserStateComputations extends Loggable {
53   def createFirstUserState(userId: String,
54                            millis: Long = TimeHelpers.nowMillis,
55                            agreementName: String = DSLAgreement.DefaultAgreementName) = {
56     val now = 0L
57     UserState(
58       userId,
59       now,
60       0L,
61       false,
62       null,
63       ImplicitlyIssuedResourceEventsSnapshot(List(), now),
64       Nil, Nil,
65       LatestResourceEventsSnapshot(List(), now),
66       0L, 0L,
67       ActiveStateSnapshot(false, now),
68       CreditSnapshot(0, now),
69       AgreementSnapshot(Agreement(agreementName, now) :: Nil, now),
70       RolesSnapshot(List(), now),
71       OwnedResourcesSnapshot(List(), now)
72     )
73   }
74
75   def createFirstUserState(userId: String, agreementName: String, resourcesMap: DSLResourcesMap) = {
76       val now = 0L
77       UserState(
78         userId,
79         now,
80         0L,
81         false,
82         null,
83         ImplicitlyIssuedResourceEventsSnapshot(List(), now),
84         Nil, Nil,
85         LatestResourceEventsSnapshot(List(), now),
86         0L, 0L,
87         ActiveStateSnapshot(false, now),
88         CreditSnapshot(0, now),
89         AgreementSnapshot(Agreement(agreementName, now) :: Nil, now),
90         RolesSnapshot(List(), now),
91         OwnedResourcesSnapshot(List(), now)
92       )
93     }
94
95   def findUserStateAtEndOfBillingMonth(userId: String,
96                                        billingMonthInfo: BillingMonthInfo,
97                                        userStateStore: UserStateStore,
98                                        resourceEventStore: ResourceEventStore,
99                                        policyStore: PolicyStore,
100                                        userCreationMillis: Long,
101                                        currentUserState: UserState,
102                                        zeroUserState: UserState, 
103                                        defaultPolicy: DSLPolicy,
104                                        defaultResourcesMap: DSLResourcesMap,
105                                        accounting: Accounting,
106                                        contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = {
107
108     val clog = ContextualLogger.fromOther(
109       contextualLogger,
110       logger,
111       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
112     clog.begin()
113
114     def doCompute: Maybe[UserState] = {
115       clog.debug("Computing full month billing")
116       doFullMonthlyBilling(
117         userId,
118         billingMonthInfo,
119         userStateStore,
120         resourceEventStore,
121         policyStore,
122         userCreationMillis,
123         currentUserState,
124         zeroUserState,
125         defaultPolicy,
126         defaultResourcesMap,
127         accounting,
128         Just(clog))
129     }
130
131     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
132     val billingMonthStartMillis = billingMonthInfo.startMillis
133     val billingMonthStopMillis  = billingMonthInfo.stopMillis
134
135     if(billingMonthStopMillis < userCreationMillis) {
136       // If the user did not exist for this billing month, piece of cake
137       clog.debug("User did not exist before %s", userCreationDateCalc)
138       clog.debug("Returning ZERO state %s".format(zeroUserState))
139       clog.endWith(Just(zeroUserState))
140     } else {
141       // Ask DB cache for the latest known user state for this billing period
142       val latestUserStateM = userStateStore.findLatestUserStateForEndOfBillingMonth(
143         userId,
144         billingMonthInfo.year,
145         billingMonthInfo.month)
146
147       latestUserStateM match {
148         case NoVal ⇒
149           // Not found, must compute
150           clog.debug("No user state found from cache, will have to (re)compute")
151           clog.endWith(doCompute)
152           
153         case failed @ Failed(_, _) ⇒
154           clog.warn("Failure while quering cache for user state: %s", failed)
155           clog.endWith(failed)
156
157         case Just(latestUserState) ⇒
158           // Found a "latest" user state but need to see if it is indeed the true and one latest.
159           // For this reason, we must count the events again.
160          val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
161          val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
162            userId,
163            billingMonthStartMillis,
164            billingMonthStopMillis)
165
166          actualOOSEventsCounterM match {
167            case NoVal ⇒
168              val errMsg = "No counter computed for out of sync events. Should at least be zero."
169              clog.warn(errMsg)
170              clog.endWith(Failed(new Exception(errMsg)))
171
172            case failed @ Failed(_, _) ⇒
173              clog.warn("Failure while querying for out of sync events: %s", failed)
174              clog.endWith(failed)
175
176            case Just(actualOOSEventsCounter) ⇒
177              val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
178              counterDiff match {
179                // ZERO, we are OK!
180                case 0 ⇒
181                  latestUserStateM
182
183                // We had more, so must recompute
184                case n if n > 0 ⇒
185                  clog.debug(
186                    "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
187                  clog.endWith(doCompute)
188
189                // We had less????
190                case n if n < 0 ⇒
191                  val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
192                  clog.warn(errMsg)
193                  clog.endWith(Failed(new Exception(errMsg)))
194              }
195          }
196       }
197     }
198   }
199
200   def doFullMonthlyBilling(userId: String,
201                            billingMonthInfo: BillingMonthInfo,
202                            userStateStore: UserStateStore,
203                            resourceEventStore: ResourceEventStore,
204                            policyStore: PolicyStore,
205                            userCreationMillis: Long,
206                            currentUserState: UserState,
207                            zeroUserState: UserState,
208                            defaultPolicy: DSLPolicy,
209                            defaultResourcesMap: DSLResourcesMap,
210                            accounting: Accounting,
211                            contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = Maybe {
212
213     def rcDebugInfo(rcEvent: ResourceEvent) = {
214       rcEvent.toDebugString(defaultResourcesMap, false)
215     }
216
217     val clog = ContextualLogger.fromOther(
218       contextualLogger,
219       logger,
220       "doFullMonthlyBilling(%s)", billingMonthInfo)
221     clog.begin()
222
223     val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
224       userId,
225       billingMonthInfo.previousMonth,
226       userStateStore,
227       resourceEventStore,
228       policyStore,
229       userCreationMillis,
230       currentUserState,
231       zeroUserState,
232       defaultPolicy,
233       defaultResourcesMap,
234       accounting,
235       Just(clog)
236     )
237     
238     previousBillingMonthUserStateM match {
239       case NoVal ⇒
240         null // not really... (must throw an exception here probably...)
241       case failed @ Failed(e, _) ⇒
242         throw e
243       case Just(startingUserState) ⇒
244         // This is the real deal
245
246         // This is a collection of all the latest resource events.
247         // We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
248         // ones.
249         // Will be updated on processing the next resource event.
250         val previousResourceEvents = startingUserState.latestResourceEventsSnapshot.toMutableWorker
251         clog.debug("previousResourceEvents = %s", previousResourceEvents)
252
253         val billingMonthStartMillis = billingMonthInfo.startMillis
254         val billingMonthEndMillis = billingMonthInfo.stopMillis
255
256         // Keep the working (current) user state. This will get updated as we proceed with billing for the month
257         // specified in the parameters.
258         var _workingUserState = startingUserState
259
260         // Prepare the implicitly terminated resource events from previous billing period
261         val implicitlyTerminatedResourceEvents = _workingUserState.implicitlyTerminatedSnapshot.toMutableWorker
262         if(implicitlyTerminatedResourceEvents.size > 0) {
263           clog.debug("%s implicitlyTerminatedResourceEvents", implicitlyTerminatedResourceEvents.size)
264           clog.withIndent {
265             implicitlyTerminatedResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
266           }
267         }
268
269         // Keep the resource events from this period that were first (and unused) of their kind
270         val ignoredFirstResourceEvents = IgnoredFirstResourceEventsWorker.Empty
271
272         /**
273          * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource
274          * events and b) the explicit previous resource events. If the event is found, it is removed from the
275          * respective source.
276          *
277          * If the event is not found, then this must be for a new resource instance.
278          * (and probably then some `zero` resource event must be implied as the previous one)
279          * 
280          * @param resource
281          * @param instanceId
282          * @return
283          */
284         def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = {
285           // implicitly terminated events are checked first
286           implicitlyTerminatedResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
287             case just @ Just(_) ⇒
288               just
289             case NoVal ⇒
290               // explicit previous resource events are checked second
291               previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
292                 case just @ Just(_) ⇒
293                   just
294                 case noValOrFailed ⇒
295                   noValOrFailed
296               }
297             case failed ⇒
298               failed
299           }
300         }
301
302         // Find the actual resource events from DB
303         val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
304           userId,
305           billingMonthStartMillis,
306           billingMonthEndMillis)
307         var _eventCounter = 0
308
309         clog.debug("resourceEventStore = %s".format(resourceEventStore))
310         if(allResourceEventsForMonth.size > 0) {
311           clog.debug("Found %s resource events, starting processing...", allResourceEventsForMonth.size)
312         } else {
313           clog.debug("Not found any resource events")
314         }
315
316         for {
317           currentResourceEvent <- allResourceEventsForMonth
318         } {
319           _eventCounter = _eventCounter + 1
320           val theResource = currentResourceEvent.safeResource
321           val theInstanceId = currentResourceEvent.safeInstanceId
322           val theValue = currentResourceEvent.value
323
324           clog.indent()
325           clog.debug("")
326           clog.debug("Processing %s", currentResourceEvent)
327           clog.debug("+========= %s", rcDebugInfo(currentResourceEvent))
328
329           clog.indent()
330
331           if(previousResourceEvents.size > 0) {
332             clog.debug("%s previousResourceEvents", previousResourceEvents.size)
333             clog.withIndent {
334               previousResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
335             }
336           }
337           if(implicitlyTerminatedResourceEvents.size > 0) {
338             clog.debug("%s implicitlyTerminatedResourceEvents", implicitlyTerminatedResourceEvents.size)
339             clog.withIndent {
340               implicitlyTerminatedResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
341             }
342           }
343           if(ignoredFirstResourceEvents.size > 0) {
344             clog.debug("%s ignoredFirstResourceEvents", ignoredFirstResourceEvents.size)
345             clog.withIndent {
346               ignoredFirstResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
347             }
348           }
349
350           // Ignore the event if it is not billable (but still record it in the "previous" stuff).
351           // But to make this decision, first we need the resource definition (and its cost policy).
352           val resourceDefM = defaultResourcesMap.findResourceM(theResource)
353           resourceDefM match {
354             // We have a resource (and thus a cost policy)
355             case Just(resourceDef) ⇒
356               val costPolicy = resourceDef.costPolicy
357               clog.debug("Cost policy: %s", costPolicy)
358               val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
359               isBillable match {
360                 // The resource event is not billable
361                 case false ⇒
362                   clog.debug("Ignoring not billable event %s", rcDebugInfo(currentResourceEvent))
363
364                 // The resource event is billable
365                 case true ⇒
366                   // Find the previous event.
367                   // This is (potentially) needed to calculate new credit amount and new resource instance amount
368                   val previousResourceEventM = findAndRemovePreviousResourceEvent(theResource, theInstanceId)
369                   clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
370
371                   val havePreviousResourceEvent = previousResourceEventM.isJust
372                   val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
373                   if(needPreviousResourceEvent && !havePreviousResourceEvent) {
374                     // This must be the first resource event of its kind, ever.
375                     // TODO: We should normally check the DB to verify the claim (?)
376                     clog.info("Ignoring first event of its kind %s", rcDebugInfo(currentResourceEvent))
377                     ignoredFirstResourceEvents.updateResourceEvent(currentResourceEvent)
378                   } else {
379                     val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
380                     val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
381                     val oldCredits = _workingUserState.creditsSnapshot.creditAmount
382
383                     // A. Compute new resource instance accumulating amount
384                     val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
385                     
386                     clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
387
388                     // B. Compute new wallet entries
389                     val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
390
391                     val fullChargeslotsM = accounting.computeFullChargeslots(
392                       previousResourceEventM,
393                       currentResourceEvent,
394                       oldCredits,
395                       oldAmount,
396                       newAmount,
397                       resourceDef,
398                       defaultResourcesMap,
399                       alltimeAgreements,
400                       SimpleCostPolicyAlgorithmCompiler,
401                       policyStore,
402                       Just(clog)
403                     )
404
405                     // We have the chargeslots, let's associate them with the current event
406                     fullChargeslotsM match {
407                       case Just(fullChargeslots) ⇒
408                         if(fullChargeslots.length == 0) {
409                           // At least one chargeslot is required.
410                           throw new Exception("No chargeslots computed")
411                         }
412                         clog.debug("chargeslots:")
413                         clog.withIndent {
414                           for(fullChargeslot <- fullChargeslots) {
415                             clog.debug("%s", fullChargeslot)
416                           }
417                         }
418                         
419                         // C. Compute new credit amount (based on the charge slots)
420                         val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
421                         val newCredits = oldCredits + newCreditsDiff
422                         clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
423
424                         val newWalletEntry = NewWalletEntry(
425                           userId,
426                           newCreditsDiff,
427                           oldCredits,
428                           newCredits,
429                           TimeHelpers.nowMillis,
430                           billingMonthInfo.year,
431                           billingMonthInfo.month,
432                           currentResourceEvent,
433                           previousResourceEventM.toOption,
434                           fullChargeslots,
435                           resourceDef
436                         )
437
438                         clog.debug("New %s", newWalletEntry)
439
440                       case NoVal ⇒
441                         // At least one chargeslot is required.
442                         throw new Exception("No chargeslots computed")
443
444                       case failed @ Failed(e, m) ⇒
445                         throw new Exception(m, e)
446                     }
447                   }
448
449               }
450
451               // After processing, all event, billable or not update the previous state
452               previousResourceEvents.updateResourceEvent(currentResourceEvent)
453
454             // We do not have a resource (and no cost policy)
455             case NoVal ⇒
456               // Now, this is a matter of politics: what do we do if no policy was found?
457               clog.error("No cost policy for %s", rcDebugInfo(currentResourceEvent))
458
459             // Could not retrieve resource (unlikely to happen)
460             case failed @ Failed(e, m) ⇒
461               clog.error("Error obtaining cost policy for %s", rcDebugInfo(currentResourceEvent))
462               clog.error(e, m)
463           }
464
465           clog.unindent()
466           clog.debug("-========= %s", rcDebugInfo(currentResourceEvent))
467           clog.unindent()
468         }
469         
470
471         clog.endWith(_workingUserState)
472     }
473   }
474 }