WIP: New state machine for message processing
[aquarium] / src / main / scala / gr / grnet / aquarium / charging / ChargingService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.charging
37
38 import gr.grnet.aquarium.computation.BillingMonthInfo
39 import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserStateMsg, ResourceEventMsg}
40 import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, AvroHelpers}
41 import gr.grnet.aquarium.util.LogHelpers.Debug
42 import gr.grnet.aquarium.util.LogHelpers.DebugSeq
43 import gr.grnet.aquarium.util.LogHelpers.Warn
44 import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
45 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
46 import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
47 import java.util.{Map ⇒ JMap}
48 import gr.grnet.aquarium.event.CreditsModel
49 import gr.grnet.aquarium.charging.state.UserAgreementHistoryModel
50
51 /**
52  *
53  * @author Christos KK Loverdos <loverdos@gmail.com>
54  */
55
56 final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Loggable {
57   lazy val policyStore = aquarium.policyStore
58   lazy val userStateStore = aquarium.userStateStore
59   lazy val resourceEventStore = aquarium.resourceEventStore
60
61   //+ Lifecycle
62   def start() {}
63
64   def stop() {}
65   //- Lifecycle
66
67   def calculateRealtimeUserState(
68       userAgreementHistoryModel: UserAgreementHistoryModel,
69       userStateMsg: UserStateMsg,
70       billingMonthInfo: BillingMonthInfo,
71       resourceMapping: JMap[String, ResourceTypeMsg],
72       realtimeMillis: Long
73   ) {
74
75     import scala.collection.JavaConverters.mapAsScalaMapConverter
76
77     val stateOfResources = userStateMsg.getStateOfResources.asScala
78
79     for( (resourceName, workingResourcesState) ← stateOfResources) {
80       resourceMapping.get(resourceName) match {
81         case null ⇒
82           // Ignore
83
84         case resourceTypeMsg ⇒
85           val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
86           val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala
87
88           for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) {
89             Debug(logger, "Realtime calculation for %s, %s", resourceName, resourceInstanceID)
90             val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
91               userStateMsg.getUserID,
92               resourceName,
93               resourceInstanceID,
94               realtimeMillis,
95               resourceInstanceState
96             )
97             DebugSeq(logger, "virtualEvents", virtualEvents, 1)
98
99             processResourceEvents(
100               virtualEvents,
101               userAgreementHistoryModel,
102               userStateMsg,
103               billingMonthInfo,
104               resourceMapping,
105               realtimeMillis
106             )
107           }
108       }
109     }
110   }
111
112   def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
113       userAgreementHistoryModel: UserAgreementHistoryModel,
114       billingMonthInfo: BillingMonthInfo,
115       resourceMapping: JMap[String, ResourceTypeMsg],
116       userStateRecorder: UserStateMsg ⇒ UserStateMsg
117   ): UserStateMsg = {
118
119     def computeFullMonthBillingAndSaveState(): UserStateMsg = {
120       val fullMonthUserState = replayFullMonthBilling(
121         userAgreementHistoryModel,
122         billingMonthInfo,
123         resourceMapping,
124         userStateRecorder
125       )
126
127       val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState).
128         setIsFullBillingMonth(true).
129         setBillingYear(billingMonthInfo.year).
130         setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay?
131         setOriginalID("").
132         build()
133
134       // We always save the state when it is a full month billing
135       val monthlyUserState1 = userStateRecorder.apply(monthlyUserState0)
136
137       Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1))
138
139       monthlyUserState1
140     }
141
142     val userID = userAgreementHistoryModel.userID
143     val userCreationMillis = userAgreementHistoryModel.unsafeUserCreationMillis
144     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
145     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
146     val billingMonthStopMillis = billingMonthInfo.monthStopMillis
147
148     if(billingMonthStopMillis < userCreationMillis) {
149       // If the user did not exist for this billing month, piece of cake
150       Debug(logger, "User did not exist before %s", userCreationDateCalc)
151
152       // TODO: The initial user state might have already been created.
153       //       First ask if it exists and compute only if not
154       val initialUserState0 = MessageFactory.newInitialUserStateMsg(
155         userID,
156         CreditsModel.from(0.0),
157         TimeHelpers.nowMillis()
158       )
159
160       Debug(logger, "Created (from bootstrap) initial user state %s", initialUserState0)
161
162       // We always save the initial state
163       val initialUserState1 = userStateRecorder.apply(initialUserState0)
164
165       Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1))
166
167       return initialUserState1
168     }
169
170     // Ask DB cache for the latest known user state for this billing period
171     val latestUserStateOpt = userStateStore.findLatestUserStateForFullMonthBilling(
172       userID,
173       billingMonthInfo)
174
175     latestUserStateOpt match {
176       case None ⇒
177         // Not found, must compute
178         Debug(logger, "No user state found from cache, will have to (re)compute")
179         computeFullMonthBillingAndSaveState
180
181       case Some(latestUserState) ⇒
182         // Found a "latest" user state but need to see if it is indeed the true and one latest.
183         // For this reason, we must count the events again.
184         val latestStateOOSEventsCounter = latestUserState.getBillingPeriodOutOfSyncResourceEventsCounter
185         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
186           userID,
187           billingMonthStartMillis,
188           billingMonthStopMillis)
189
190         val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
191         counterDiff match {
192           // ZERO, we are OK!
193           case 0 ⇒
194             // NOTE: Keep the caller's calculation reason
195             latestUserState
196
197           // We had more, so must recompute
198           case n if n > 0 ⇒
199             Debug(logger,
200               "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
201             computeFullMonthBillingAndSaveState
202
203           // We had less????
204           case n if n < 0 ⇒
205             val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
206             Warn(logger, errMsg)
207             throw new AquariumInternalError(errMsg)
208         }
209     }
210   }
211   /**
212    * Processes one resource event and computes relevant, incremental charges.
213    *
214    * @param resourceEvent
215    * @param userStateMsg
216    * @param billingMonthInfo
217    */
218   def processResourceEvent(
219       resourceEvent: ResourceEventMsg,
220       userAgreementHistoryModel: UserAgreementHistoryModel,
221       userStateMsg: UserStateMsg,
222       billingMonthInfo: BillingMonthInfo,
223       updateLatestMillis: Boolean,
224       resourceMapping: JMap[String, ResourceTypeMsg]
225   ): Boolean = {
226     logger.warn("processResourceEvent:workingUserState=%s".format(userStateMsg)) //
227     val resourceName = resourceEvent.getResource
228     val resourceTypeMsg = resourceMapping.get(resourceName)
229     if(resourceTypeMsg eq null) {
230       // Unknown (yet) resource, ignoring event.
231       return false
232     }
233
234     val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
235     val resourcesChargingState = userStateMsg.getStateOfResources.get(resourceName) match {
236       case null ⇒
237         // First time for this ChargingBehavior.
238         val newState = MessageFactory.newResourcesChargingStateMsg(
239           resourceName,
240           chargingBehavior.initialChargingDetails
241         )
242         userStateMsg.getStateOfResources.put(resourceName, newState)
243         newState
244       case existingState ⇒
245         existingState
246     }
247
248     val m0 = TimeHelpers.nowMillis()
249     val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
250       aquarium,
251       resourceEvent,
252       resourceTypeMsg,
253       billingMonthInfo,
254       resourcesChargingState,
255       userAgreementHistoryModel,
256       userStateMsg,
257       msg ⇒ userStateMsg.getWalletEntries.add(msg)
258     )
259     val m1 = TimeHelpers.nowMillis()
260
261     if(updateLatestMillis) {
262       userStateMsg.setLatestUpdateMillis(m1)
263     }
264
265     MessageHelpers.updateLatestResourceEventOccurredMillis(userStateMsg, resourceEvent.getOccurredMillis)
266     MessageHelpers.subtractCredits(userStateMsg, creditsToSubtract)
267
268     true
269   }
270
271   def processResourceEvents(
272       resourceEvents: Traversable[ResourceEventMsg],
273       userAgreementHistoryModel: UserAgreementHistoryModel,
274       userStateMsg: UserStateMsg,
275       billingMonthInfo: BillingMonthInfo,
276       resourceMapping: JMap[String, ResourceTypeMsg],
277       latestUpdateMillis: Long
278   ): Unit = {
279
280     var _counter = 0
281     for(currentResourceEvent ← resourceEvents) {
282       processResourceEvent(
283         currentResourceEvent,
284         userAgreementHistoryModel,
285         userStateMsg,
286         billingMonthInfo,
287         false,
288         resourceMapping
289       )
290
291       _counter += 1
292     }
293
294     if(_counter > 0) {
295       userStateMsg.setLatestUpdateMillis(latestUpdateMillis)
296     }
297   }
298
299   def replayFullMonthBilling(
300       userAgreementHistoryModel: UserAgreementHistoryModel,
301       billingMonthInfo: BillingMonthInfo,
302       resourceMapping: JMap[String, ResourceTypeMsg],
303       userStateRecorder: UserStateMsg ⇒ UserStateMsg
304   ): UserStateMsg = {
305
306     replayMonthChargingUpTo(
307       userAgreementHistoryModel,
308       billingMonthInfo,
309       billingMonthInfo.monthStopMillis,
310       resourceMapping,
311       userStateRecorder
312     )
313   }
314
315   /**
316    * Replays the charging procedure over the set of resource events that happened within the given month and up to
317    * the specified point in time.
318    *
319    * @param billingMonthInfo Which month to bill.
320    * @param billingEndTimeMillis Bill from start of month up to (and including) this time.
321    * @param userStateRecorder
322    * @return
323    */
324   def replayMonthChargingUpTo(
325       userAgreementHistoryModel: UserAgreementHistoryModel,
326       billingMonthInfo: BillingMonthInfo,
327       billingEndTimeMillis: Long,
328       resourceMapping: JMap[String, ResourceTypeMsg],
329       userStateRecorder: UserStateMsg ⇒ UserStateMsg
330   ): UserStateMsg = {
331
332     val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
333     val userID = userAgreementHistoryModel.userID
334
335     // In order to replay the full month, we start with the state at the beginning of the month.
336     val previousBillingMonthInfo = billingMonthInfo.previousMonth
337     val userStateMsg = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
338       userAgreementHistoryModel,
339       previousBillingMonthInfo,
340       resourceMapping,
341       userStateRecorder
342     )
343
344     // FIXME the below comments
345     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
346     // specified in the parameters.
347     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
348
349     Debug(logger, "workingUserState=%s", userStateMsg)
350     Debug(logger, "previousBillingMonthUserState(%s) = %s",
351       previousBillingMonthInfo.toShortDebugString,
352       userStateMsg
353     )
354
355     var _rcEventsCounter = 0
356     resourceEventStore.foreachResourceEventOccurredInPeriod(
357       userID,
358       billingMonthInfo.monthStartMillis, // from start of month
359       billingEndTimeMillis               // to requested time
360     ) { currentResourceEvent ⇒
361
362       Debug(logger, "Processing %s", currentResourceEvent)
363
364       processResourceEvent(
365         currentResourceEvent,
366         userAgreementHistoryModel,
367         userStateMsg,
368         billingMonthInfo,
369         false,
370         resourceMapping
371       )
372
373       _rcEventsCounter += 1
374     }
375
376     if(_rcEventsCounter > 0) {
377       userStateMsg.setLatestUpdateMillis(TimeHelpers.nowMillis())
378     }
379
380     Debug(logger, "Found %s resource events for month %s",
381       _rcEventsCounter,
382       billingMonthInfo.toShortDebugString
383     )
384
385     // FIXME Reuse the logic here...Do not erase the comment...
386     /*if(isFullMonthBilling) {
387       // For the remaining events which must contribute an implicit OFF, we collect those OFFs
388       // ... in order to generate an implicit ON later (during the next billing cycle).
389       val (generatorsOfImplicitEnds, theirImplicitEnds) = workingUserState.findAndRemoveGeneratorsOfImplicitEndEvents(
390         aquarium.chargingBehaviorOf(_),
391         billingMonthInfo.monthStopMillis
392       )
393
394       if(generatorsOfImplicitEnds.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
395         Debug(logger, "")
396         Debug(logger, "Process implicitly issued events")
397         DebugSeq(logger, "generatorsOfImplicitEnds", generatorsOfImplicitEnds, 0)
398         DebugSeq(logger, "theirImplicitEnds", theirImplicitEnds, 0)
399       }
400
401       // Now, the previous and implicitly started must be our base for the following computation, so we create an
402       // appropriate worker
403       val specialWorkingUserState = workingUserState.newForImplicitEndsAsPreviousEvents(
404         WorkingUserState.makePreviousResourceEventMap(generatorsOfImplicitEnds)
405       )
406
407       processResourceEvents(
408         theirImplicitEnds,
409         specialWorkingUserState,
410         chargingReason,
411         billingMonthInfo
412       )
413
414       workingUserState.walletEntries ++= specialWorkingUserState.walletEntries
415       workingUserState.totalCredits    = specialWorkingUserState.totalCredits
416     }*/
417
418     userStateMsg
419   }
420 }