Realtime calc for continuous charging behavior
[aquarium] / src / main / scala / gr / grnet / aquarium / charging / ChargingService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.charging
37
38 import scala.collection.mutable
39 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
40 import gr.grnet.aquarium.computation.BillingMonthInfo
41 import gr.grnet.aquarium.charging.state.{WorkingResourcesChargingState, UserStateBootstrap, WorkingUserState, UserStateModel, StdUserState}
42 import gr.grnet.aquarium.policy.ResourceType
43 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
44 import gr.grnet.aquarium.util.LogHelpers.Debug
45 import gr.grnet.aquarium.util.LogHelpers.DebugSeq
46 import gr.grnet.aquarium.util.LogHelpers.Warn
47 import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
48 import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
49
50 /**
51  *
52  * @author Christos KK Loverdos <loverdos@gmail.com>
53  */
54
55 final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Loggable {
56   lazy val policyStore = aquarium.policyStore
57   lazy val userStateStore = aquarium.userStateStore
58   lazy val resourceEventStore = aquarium.resourceEventStore
59
60   //+ Lifecycle
61   def start() {}
62
63   def stop() {}
64   //- Lifecycle
65
66
67   //+ Utility methods
68   protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
69     rcEvent.toDebugString
70   }
71   //- Utility methods
72
73   def calculateRealtimeWorkingUserState(
74       workingUserState: WorkingUserState,
75       billingMonthInfo: BillingMonthInfo,
76       realtimeMillis: Long
77   ) {
78     for( (resourceTypeName, workingResourcesState) ← workingUserState.workingStateOfResources) {
79       workingUserState.findResourceType(resourceTypeName) match {
80         case None ⇒
81           // Ignore
82
83         case Some(resourceType) ⇒
84           val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
85
86           for((resourceInstanceID, workingResourceInstanceState) ← workingResourcesState.stateOfResourceInstance) {
87             Debug(logger, "Realtime calculation for %s, %s", resourceTypeName, resourceInstanceID)
88             val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
89               workingUserState.userID,
90               resourceTypeName,
91               resourceInstanceID,
92               realtimeMillis,
93               workingResourceInstanceState
94             )
95             DebugSeq(logger, "virtualEvents", virtualEvents, 1)
96
97             processResourceEvents(
98               virtualEvents,
99               workingUserState,
100               billingMonthInfo,
101               realtimeMillis
102             )
103           }
104       }
105     }
106   }
107
108   def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
109       billingMonthInfo: BillingMonthInfo,
110       userStateBootstrap: UserStateBootstrap,
111       defaultResourceTypesMap: Map[String, ResourceType],
112       userStateRecorder: UserStateModel ⇒ UserStateModel
113   ): WorkingUserState = {
114
115     def computeFullMonthBillingAndSaveState(): WorkingUserState = {
116       val workingUserState = replayFullMonthBilling(
117         userStateBootstrap,
118         billingMonthInfo,
119         defaultResourceTypesMap,
120         userStateRecorder
121       )
122
123       val monthlyUserState0 = workingUserState.toUserState(
124         true,
125         billingMonthInfo.year,
126         billingMonthInfo.month,
127         ""
128       )
129
130       // We always save the state when it is a full month billing
131       val monthlyUserState1 = userStateRecorder.apply(monthlyUserState0)
132
133       Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, monthlyUserState1.toJsonString)
134
135       workingUserState
136     }
137
138     val userID = userStateBootstrap.userID
139     val userCreationMillis = userStateBootstrap.userCreationMillis
140     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
141     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
142     val billingMonthStopMillis = billingMonthInfo.monthStopMillis
143
144     if(billingMonthStopMillis < userCreationMillis) {
145       // If the user did not exist for this billing month, piece of cake
146       Debug(logger, "User did not exist before %s", userCreationDateCalc)
147
148       // TODO: The initial user state might have already been created.
149       //       First ask if it exists and compute only if not
150       val initialUserState0 = StdUserState.createInitialUserStateFromBootstrap(
151         userStateBootstrap,
152         TimeHelpers.nowMillis()
153       )
154
155       Debug(logger, "Created (from bootstrap) initial user state %s", initialUserState0)
156
157       // We always save the initial state
158       val initialUserState1 = userStateRecorder.apply(initialUserState0)
159
160       Debug(logger, "Stored initial state = %s", initialUserState1.toJsonString)
161
162       return initialUserState1.toWorkingUserState(defaultResourceTypesMap)
163     }
164
165     // Ask DB cache for the latest known user state for this billing period
166     val latestUserStateOpt = userStateStore.findLatestUserStateForFullMonthBilling(
167       userID,
168       billingMonthInfo)
169
170     latestUserStateOpt match {
171       case None ⇒
172         // Not found, must compute
173         Debug(logger, "No user state found from cache, will have to (re)compute")
174         computeFullMonthBillingAndSaveState
175
176       case Some(latestUserState) ⇒
177         // Found a "latest" user state but need to see if it is indeed the true and one latest.
178         // For this reason, we must count the events again.
179         val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
180         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
181           userID,
182           billingMonthStartMillis,
183           billingMonthStopMillis)
184
185         val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
186         counterDiff match {
187           // ZERO, we are OK!
188           case 0 ⇒
189             // NOTE: Keep the caller's calculation reason
190             latestUserState.toWorkingUserState(defaultResourceTypesMap)
191
192           // We had more, so must recompute
193           case n if n > 0 ⇒
194             Debug(logger,
195               "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
196             computeFullMonthBillingAndSaveState
197
198           // We had less????
199           case n if n < 0 ⇒
200             val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
201             Warn(logger, errMsg)
202             throw new AquariumInternalError(errMsg)
203         }
204     }
205   }
206   /**
207    * Processes one resource event and computes relevant, incremental charges.
208    *
209    * @param resourceEvent
210    * @param workingUserState
211    * @param billingMonthInfo
212    */
213   def processResourceEvent(
214       resourceEvent: ResourceEventModel,
215       workingUserState: WorkingUserState,
216       billingMonthInfo: BillingMonthInfo,
217       updateLatestMillis: Boolean
218   ): Boolean = {
219
220     val resourceTypeName = resourceEvent.resource
221     val resourceTypeOpt = workingUserState.findResourceType(resourceTypeName)
222     if(resourceTypeOpt.isEmpty) {
223       // Unknown (yet) resource, ignoring event.
224       return false
225     }
226     val resourceType = resourceTypeOpt.get
227
228     val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
229     val workingResourcesState = workingUserState.workingStateOfResources.get(resourceTypeName) match {
230       case Some(existingState) ⇒
231         existingState
232
233       case None ⇒
234         // First time for this ChargingBehavior.
235         val newState = new WorkingResourcesChargingState(
236           details = mutable.Map(chargingBehavior.initialChargingDetails.toSeq:_*),
237           stateOfResourceInstance = mutable.Map()
238         )
239
240         workingUserState.workingStateOfResources(resourceTypeName) = newState
241         newState
242     }
243
244     val m0 = TimeHelpers.nowMillis()
245     val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
246       aquarium,
247       resourceEvent,
248       resourceType,
249       billingMonthInfo,
250       workingResourcesState,
251       workingUserState.workingAgreementHistory,
252       workingUserState.totalCredits,
253       workingUserState.walletEntries += _
254     )
255     val m1 = TimeHelpers.nowMillis()
256
257     if(updateLatestMillis) {
258       workingUserState.latestUpdateMillis = m1
259     }
260
261     workingUserState.updateLatestResourceEventOccurredMillis(resourceEvent.occurredMillis)
262     workingUserState.totalCredits -= creditsToSubtract
263
264     true
265   }
266
267   def processResourceEvents(
268       resourceEvents: Traversable[ResourceEventModel],
269       workingUserState: WorkingUserState,
270       billingMonthInfo: BillingMonthInfo,
271       latestUpdateMillis: Long
272   ): Unit = {
273
274     var _counter = 0
275     for(currentResourceEvent ← resourceEvents) {
276       processResourceEvent(
277         currentResourceEvent,
278         workingUserState,
279         billingMonthInfo,
280         false
281       )
282
283       _counter += 1
284     }
285
286     if(_counter > 0) {
287       workingUserState.latestUpdateMillis = latestUpdateMillis
288     }
289   }
290
291   def replayFullMonthBilling(
292       userStateBootstrap: UserStateBootstrap,
293       billingMonthInfo: BillingMonthInfo,
294       defaultResourceTypesMap: Map[String, ResourceType],
295       userStateRecorder: UserStateModel ⇒ UserStateModel
296   ): WorkingUserState = {
297
298     replayMonthChargingUpTo(
299       billingMonthInfo,
300       billingMonthInfo.monthStopMillis,
301       userStateBootstrap,
302       defaultResourceTypesMap,
303       userStateRecorder
304     )
305   }
306
307   /**
308    * Replays the charging procedure over the set of resource events that happened within the given month and up to
309    * the specified point in time.
310    *
311    * @param billingMonthInfo Which month to bill.
312    * @param billingEndTimeMillis Bill from start of month up to (and including) this time.
313    * @param userStateBootstrap
314    * @param resourceTypesMap
315    * @param userStateRecorder
316    * @return
317    */
318   def replayMonthChargingUpTo(
319       billingMonthInfo: BillingMonthInfo,
320       billingEndTimeMillis: Long,
321       userStateBootstrap: UserStateBootstrap,
322       resourceTypesMap: Map[String, ResourceType],
323       userStateRecorder: UserStateModel ⇒ UserStateModel
324   ): WorkingUserState = {
325
326     val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
327     val userID = userStateBootstrap.userID
328
329     // In order to replay the full month, we start with the state at the beginning of the month.
330     val previousBillingMonthInfo = billingMonthInfo.previousMonth
331     val workingUserState = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
332       previousBillingMonthInfo,
333       userStateBootstrap,
334       resourceTypesMap,
335       userStateRecorder
336     )
337
338     // FIXME the below comments
339     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
340     // specified in the parameters.
341     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
342
343     Debug(logger, "workingUserState=%s", workingUserState)
344     Debug(logger, "previousBillingMonthUserState(%s) = %s",
345       previousBillingMonthInfo.toShortDebugString,
346       workingUserState
347     )
348
349     var _rcEventsCounter = 0
350     resourceEventStore.foreachResourceEventOccurredInPeriod(
351       userID,
352       billingMonthInfo.monthStartMillis, // from start of month
353       billingEndTimeMillis               // to requested time
354     ) { currentResourceEvent ⇒
355
356       Debug(logger, "Processing %s", currentResourceEvent)
357
358       processResourceEvent(
359         currentResourceEvent,
360         workingUserState,
361         billingMonthInfo,
362         false
363       )
364
365       _rcEventsCounter += 1
366     }
367
368     if(_rcEventsCounter > 0) {
369       workingUserState.latestUpdateMillis = TimeHelpers.nowMillis()
370     }
371
372     Debug(logger, "Found %s resource events for month %s",
373       _rcEventsCounter,
374       billingMonthInfo.toShortDebugString
375     )
376
377     // FIXME Reuse the logic here...Do not erase the comment...
378     /*if(isFullMonthBilling) {
379       // For the remaining events which must contribute an implicit OFF, we collect those OFFs
380       // ... in order to generate an implicit ON later (during the next billing cycle).
381       val (generatorsOfImplicitEnds, theirImplicitEnds) = workingUserState.findAndRemoveGeneratorsOfImplicitEndEvents(
382         aquarium.chargingBehaviorOf(_),
383         billingMonthInfo.monthStopMillis
384       )
385
386       if(generatorsOfImplicitEnds.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
387         Debug(logger, "")
388         Debug(logger, "Process implicitly issued events")
389         DebugSeq(logger, "generatorsOfImplicitEnds", generatorsOfImplicitEnds, 0)
390         DebugSeq(logger, "theirImplicitEnds", theirImplicitEnds, 0)
391       }
392
393       // Now, the previous and implicitly started must be our base for the following computation, so we create an
394       // appropriate worker
395       val specialWorkingUserState = workingUserState.newForImplicitEndsAsPreviousEvents(
396         WorkingUserState.makePreviousResourceEventMap(generatorsOfImplicitEnds)
397       )
398
399       processResourceEvents(
400         theirImplicitEnds,
401         specialWorkingUserState,
402         chargingReason,
403         billingMonthInfo
404       )
405
406       workingUserState.walletEntries ++= specialWorkingUserState.walletEntries
407       workingUserState.totalCredits    = specialWorkingUserState.totalCredits
408     }*/
409
410     workingUserState
411   }
412 }