Several fixes here and there => Removed all exceptions caused by Avro. Rolled back...
[aquarium] / src / main / scala / gr / grnet / aquarium / charging / ChargingService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.charging
37
38 import gr.grnet.aquarium.charging.state.{UserStateModel, UserStateBootstrap}
39 import gr.grnet.aquarium.computation.BillingMonthInfo
40 import gr.grnet.aquarium.message.avro.gen.{ResourcesChargingStateMsg, UserStateMsg, ResourceEventMsg}
41 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, AvroHelpers}
42 import gr.grnet.aquarium.policy.ResourceType
43 import gr.grnet.aquarium.util.LogHelpers.Debug
44 import gr.grnet.aquarium.util.LogHelpers.DebugSeq
45 import gr.grnet.aquarium.util.LogHelpers.Warn
46 import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
47 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
48 import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
49 import java.util.{HashMap ⇒ JHashMap}
50
51 /**
52  *
53  * @author Christos KK Loverdos <loverdos@gmail.com>
54  */
55
56 final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Loggable {
57   lazy val policyStore = aquarium.policyStore
58   lazy val userStateStore = aquarium.userStateStore
59   lazy val resourceEventStore = aquarium.resourceEventStore
60
61   //+ Lifecycle
62   def start() {}
63
64   def stop() {}
65   //- Lifecycle
66
67   def calculateRealtimeUserState(
68       userState: UserStateModel,
69       billingMonthInfo: BillingMonthInfo,
70       realtimeMillis: Long
71   ) {
72
73     import scala.collection.JavaConverters.mapAsScalaMapConverter
74
75     val stateOfResources = userState.msg.getStateOfResources.asScala
76     val resourceTypesMap = userState.msg.getResourceTypesMap.asScala
77
78     for( (resourceTypeName, workingResourcesState) ← stateOfResources) {
79       userState.msg.getResourceTypesMap.get(resourceTypeName) match {
80         case null ⇒
81           // Ignore
82
83         case resourceType ⇒
84           val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
85           val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala
86
87           for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) {
88             Debug(logger, "Realtime calculation for %s, %s", resourceTypeName, resourceInstanceID)
89             val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
90               userState.userID,
91               resourceTypeName,
92               resourceInstanceID,
93               realtimeMillis,
94               resourceInstanceState
95             )
96             DebugSeq(logger, "virtualEvents", virtualEvents, 1)
97
98             processResourceEvents(
99               virtualEvents,
100               userState,
101               billingMonthInfo,
102               realtimeMillis
103             )
104           }
105       }
106     }
107   }
108
109   def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
110       billingMonthInfo: BillingMonthInfo,
111       userStateBootstrap: UserStateBootstrap,
112       defaultResourceTypesMap: Map[String, ResourceType],
113       userStateRecorder: UserStateMsg ⇒ UserStateMsg
114   ): UserStateModel = {
115
116     def computeFullMonthBillingAndSaveState(): UserStateModel = {
117       val fullMonthUserState = replayFullMonthBilling(
118         userStateBootstrap,
119         billingMonthInfo,
120         defaultResourceTypesMap,
121         userStateRecorder
122       )
123
124       val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState.msg).
125         setIsFullBillingMonth(true).
126         setBillingYear(billingMonthInfo.year).
127         setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay?
128         setOriginalID("").
129         setResourceTypesMap(MessageFactory.newResourceTypeMsgsMap(defaultResourceTypesMap)).
130         build()
131
132       // We always save the state when it is a full month billing
133       val monthlyUserState1 = userStateRecorder.apply(monthlyUserState0)
134
135       Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1))
136
137       ModelFactory.newUserStateModel(monthlyUserState1)
138     }
139
140     val userID = userStateBootstrap.userID
141     val userCreationMillis = userStateBootstrap.userCreationMillis
142     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
143     val billingMonthStartMillis = billingMonthInfo.monthStartMillis
144     val billingMonthStopMillis = billingMonthInfo.monthStopMillis
145
146     if(billingMonthStopMillis < userCreationMillis) {
147       // If the user did not exist for this billing month, piece of cake
148       Debug(logger, "User did not exist before %s", userCreationDateCalc)
149
150       // TODO: The initial user state might have already been created.
151       //       First ask if it exists and compute only if not
152       val initialUserState0 = MessageFactory.createInitialUserStateMsg(
153         userStateBootstrap,
154         defaultResourceTypesMap,
155         TimeHelpers.nowMillis()
156       )
157
158       Debug(logger, "Created (from bootstrap) initial user state %s", initialUserState0)
159
160       // We always save the initial state
161       val initialUserState1 = userStateRecorder.apply(initialUserState0)
162
163       Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1))
164
165       return ModelFactory.newUserStateModel(initialUserState1)
166     }
167
168     // Ask DB cache for the latest known user state for this billing period
169     val latestUserStateOpt = userStateStore.findLatestUserStateForFullMonthBilling(
170       userID,
171       billingMonthInfo)
172
173     latestUserStateOpt match {
174       case None ⇒
175         // Not found, must compute
176         Debug(logger, "No user state found from cache, will have to (re)compute")
177         computeFullMonthBillingAndSaveState
178
179       case Some(latestUserState) ⇒
180         // Found a "latest" user state but need to see if it is indeed the true and one latest.
181         // For this reason, we must count the events again.
182         val latestStateOOSEventsCounter = latestUserState.getBillingPeriodOutOfSyncResourceEventsCounter
183         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
184           userID,
185           billingMonthStartMillis,
186           billingMonthStopMillis)
187
188         val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
189         counterDiff match {
190           // ZERO, we are OK!
191           case 0 ⇒
192             // NOTE: Keep the caller's calculation reason
193             ModelFactory.newUserStateModel(latestUserState)
194
195           // We had more, so must recompute
196           case n if n > 0 ⇒
197             Debug(logger,
198               "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
199             computeFullMonthBillingAndSaveState
200
201           // We had less????
202           case n if n < 0 ⇒
203             val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
204             Warn(logger, errMsg)
205             throw new AquariumInternalError(errMsg)
206         }
207     }
208   }
209   /**
210    * Processes one resource event and computes relevant, incremental charges.
211    *
212    * @param resourceEvent
213    * @param userStateModel
214    * @param billingMonthInfo
215    */
216   def processResourceEvent(
217       resourceEvent: ResourceEventMsg,
218       userStateModel: UserStateModel,
219       billingMonthInfo: BillingMonthInfo,
220       updateLatestMillis: Boolean
221   ): Boolean = {
222
223     val resourceTypeName = resourceEvent.getResource
224     val resourceType = userStateModel.msg.getResourceTypesMap.get(resourceTypeName)
225     if(resourceType eq null) {
226       // Unknown (yet) resource, ignoring event.
227       return false
228     }
229
230     val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
231     val resourcesChargingState = userStateModel.msg.getStateOfResources.get(resourceTypeName) match {
232       case null ⇒
233         // First time for this ChargingBehavior.
234         val newState = new ResourcesChargingStateMsg
235         newState.setResource(resourceTypeName)
236         newState.setDetails(chargingBehavior.initialChargingDetails)
237         newState.setStateOfResourceInstance(new JHashMap())
238         newState
239
240       case existingState ⇒
241         existingState
242
243     }
244
245     val m0 = TimeHelpers.nowMillis()
246     val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
247       aquarium,
248       resourceEvent,
249       resourceType,
250       billingMonthInfo,
251       resourcesChargingState,
252       userStateModel,
253       msg ⇒ userStateModel.msg.getWalletEntries.add(msg)
254     )
255     val m1 = TimeHelpers.nowMillis()
256
257     if(updateLatestMillis) {
258       userStateModel.msg.setLatestUpdateMillis(m1)
259     }
260
261     userStateModel.updateLatestResourceEventOccurredMillis(resourceEvent.getOccurredMillis)
262     userStateModel.subtractCredits(creditsToSubtract)
263
264     true
265   }
266
267   def processResourceEvents(
268       resourceEvents: Traversable[ResourceEventMsg],
269       userState: UserStateModel,
270       billingMonthInfo: BillingMonthInfo,
271       latestUpdateMillis: Long
272   ): Unit = {
273
274     var _counter = 0
275     for(currentResourceEvent ← resourceEvents) {
276       processResourceEvent(
277         currentResourceEvent,
278         userState,
279         billingMonthInfo,
280         false
281       )
282
283       _counter += 1
284     }
285
286     if(_counter > 0) {
287       userState.msg.setLatestUpdateMillis(latestUpdateMillis)
288     }
289   }
290
291   def replayFullMonthBilling(
292       userStateBootstrap: UserStateBootstrap,
293       billingMonthInfo: BillingMonthInfo,
294       defaultResourceTypesMap: Map[String, ResourceType],
295       userStateRecorder: UserStateMsg ⇒ UserStateMsg
296   ): UserStateModel = {
297
298     replayMonthChargingUpTo(
299       billingMonthInfo,
300       billingMonthInfo.monthStopMillis,
301       userStateBootstrap,
302       defaultResourceTypesMap,
303       userStateRecorder
304     )
305   }
306
307   /**
308    * Replays the charging procedure over the set of resource events that happened within the given month and up to
309    * the specified point in time.
310    *
311    * @param billingMonthInfo Which month to bill.
312    * @param billingEndTimeMillis Bill from start of month up to (and including) this time.
313    * @param userStateBootstrap
314    * @param resourceTypesMap
315    * @param userStateRecorder
316    * @return
317    */
318   def replayMonthChargingUpTo(
319       billingMonthInfo: BillingMonthInfo,
320       billingEndTimeMillis: Long,
321       userStateBootstrap: UserStateBootstrap,
322       resourceTypesMap: Map[String, ResourceType],
323       userStateRecorder: UserStateMsg ⇒ UserStateMsg
324   ): UserStateModel = {
325
326     val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
327     val userID = userStateBootstrap.userID
328
329     // In order to replay the full month, we start with the state at the beginning of the month.
330     val previousBillingMonthInfo = billingMonthInfo.previousMonth
331     val userState = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
332       previousBillingMonthInfo,
333       userStateBootstrap,
334       resourceTypesMap,
335       userStateRecorder
336     )
337
338     // FIXME the below comments
339     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
340     // specified in the parameters.
341     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
342
343     Debug(logger, "workingUserState=%s", userState)
344     Debug(logger, "previousBillingMonthUserState(%s) = %s",
345       previousBillingMonthInfo.toShortDebugString,
346       userState
347     )
348
349     var _rcEventsCounter = 0
350     resourceEventStore.foreachResourceEventOccurredInPeriod(
351       userID,
352       billingMonthInfo.monthStartMillis, // from start of month
353       billingEndTimeMillis               // to requested time
354     ) { currentResourceEvent ⇒
355
356       Debug(logger, "Processing %s", currentResourceEvent)
357
358       processResourceEvent(
359         currentResourceEvent,
360         userState,
361         billingMonthInfo,
362         false
363       )
364
365       _rcEventsCounter += 1
366     }
367
368     if(_rcEventsCounter > 0) {
369       userState.msg.setLatestUpdateMillis(TimeHelpers.nowMillis())
370     }
371
372     Debug(logger, "Found %s resource events for month %s",
373       _rcEventsCounter,
374       billingMonthInfo.toShortDebugString
375     )
376
377     // FIXME Reuse the logic here...Do not erase the comment...
378     /*if(isFullMonthBilling) {
379       // For the remaining events which must contribute an implicit OFF, we collect those OFFs
380       // ... in order to generate an implicit ON later (during the next billing cycle).
381       val (generatorsOfImplicitEnds, theirImplicitEnds) = workingUserState.findAndRemoveGeneratorsOfImplicitEndEvents(
382         aquarium.chargingBehaviorOf(_),
383         billingMonthInfo.monthStopMillis
384       )
385
386       if(generatorsOfImplicitEnds.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
387         Debug(logger, "")
388         Debug(logger, "Process implicitly issued events")
389         DebugSeq(logger, "generatorsOfImplicitEnds", generatorsOfImplicitEnds, 0)
390         DebugSeq(logger, "theirImplicitEnds", theirImplicitEnds, 0)
391       }
392
393       // Now, the previous and implicitly started must be our base for the following computation, so we create an
394       // appropriate worker
395       val specialWorkingUserState = workingUserState.newForImplicitEndsAsPreviousEvents(
396         WorkingUserState.makePreviousResourceEventMap(generatorsOfImplicitEnds)
397       )
398
399       processResourceEvents(
400         theirImplicitEnds,
401         specialWorkingUserState,
402         chargingReason,
403         billingMonthInfo
404       )
405
406       workingUserState.walletEntries ++= specialWorkingUserState.walletEntries
407       workingUserState.totalCredits    = specialWorkingUserState.totalCredits
408     }*/
409
410     userState
411   }
412 }