2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.charging
38 import gr.grnet.aquarium.charging.state.{UserStateModel, UserStateBootstrap}
39 import gr.grnet.aquarium.computation.BillingMonthInfo
40 import gr.grnet.aquarium.message.avro.gen.{ResourcesChargingStateMsg, UserStateMsg, ResourceEventMsg}
41 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, AvroHelpers}
42 import gr.grnet.aquarium.policy.ResourceType
43 import gr.grnet.aquarium.util.LogHelpers.Debug
44 import gr.grnet.aquarium.util.LogHelpers.DebugSeq
45 import gr.grnet.aquarium.util.LogHelpers.Warn
46 import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
47 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
48 import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
49 import java.util.{HashMap ⇒ JHashMap}
53 * @author Christos KK Loverdos <loverdos@gmail.com>
56 final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Loggable {
57 lazy val policyStore = aquarium.policyStore
58 lazy val userStateStore = aquarium.userStateStore
59 lazy val resourceEventStore = aquarium.resourceEventStore
67 def calculateRealtimeUserState(
68 userState: UserStateModel,
69 billingMonthInfo: BillingMonthInfo,
73 import scala.collection.JavaConverters.mapAsScalaMapConverter
75 val stateOfResources = userState.msg.getStateOfResources.asScala
76 val resourceTypesMap = userState.msg.getResourceTypesMap.asScala
78 for( (resourceTypeName, workingResourcesState) ← stateOfResources) {
79 userState.msg.getResourceTypesMap.get(resourceTypeName) match {
84 val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
85 val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala
87 for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) {
88 Debug(logger, "Realtime calculation for %s, %s", resourceTypeName, resourceInstanceID)
89 val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
96 DebugSeq(logger, "virtualEvents", virtualEvents, 1)
98 processResourceEvents(
109 def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
110 billingMonthInfo: BillingMonthInfo,
111 userStateBootstrap: UserStateBootstrap,
112 defaultResourceTypesMap: Map[String, ResourceType],
113 userStateRecorder: UserStateMsg ⇒ UserStateMsg
114 ): UserStateModel = {
116 def computeFullMonthBillingAndSaveState(): UserStateModel = {
117 val fullMonthUserState = replayFullMonthBilling(
120 defaultResourceTypesMap,
124 val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState.msg).
125 setIsFullBillingMonth(true).
126 setBillingYear(billingMonthInfo.year).
127 setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay?
129 setResourceTypesMap(MessageFactory.newResourceTypeMsgsMap(defaultResourceTypesMap)).
132 // We always save the state when it is a full month billing
133 val monthlyUserState1 = userStateRecorder.apply(monthlyUserState0)
135 Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1))
137 ModelFactory.newUserStateModel(monthlyUserState1)
140 val userID = userStateBootstrap.userID
141 val userCreationMillis = userStateBootstrap.userCreationMillis
142 val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
143 val billingMonthStartMillis = billingMonthInfo.monthStartMillis
144 val billingMonthStopMillis = billingMonthInfo.monthStopMillis
146 if(billingMonthStopMillis < userCreationMillis) {
147 // If the user did not exist for this billing month, piece of cake
148 Debug(logger, "User did not exist before %s", userCreationDateCalc)
150 // TODO: The initial user state might have already been created.
151 // First ask if it exists and compute only if not
152 val initialUserState0 = MessageFactory.createInitialUserStateMsg(
154 defaultResourceTypesMap,
155 TimeHelpers.nowMillis()
158 Debug(logger, "Created (from bootstrap) initial user state %s", initialUserState0)
160 // We always save the initial state
161 val initialUserState1 = userStateRecorder.apply(initialUserState0)
163 Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1))
165 return ModelFactory.newUserStateModel(initialUserState1)
168 // Ask DB cache for the latest known user state for this billing period
169 val latestUserStateOpt = userStateStore.findLatestUserStateForFullMonthBilling(
173 latestUserStateOpt match {
175 // Not found, must compute
176 Debug(logger, "No user state found from cache, will have to (re)compute")
177 computeFullMonthBillingAndSaveState
179 case Some(latestUserState) ⇒
180 // Found a "latest" user state but need to see if it is indeed the true and one latest.
181 // For this reason, we must count the events again.
182 val latestStateOOSEventsCounter = latestUserState.getBillingPeriodOutOfSyncResourceEventsCounter
183 val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
185 billingMonthStartMillis,
186 billingMonthStopMillis)
188 val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
192 // NOTE: Keep the caller's calculation reason
193 ModelFactory.newUserStateModel(latestUserState)
195 // We had more, so must recompute
198 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
199 computeFullMonthBillingAndSaveState
203 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
205 throw new AquariumInternalError(errMsg)
210 * Processes one resource event and computes relevant, incremental charges.
212 * @param resourceEvent
213 * @param userStateModel
214 * @param billingMonthInfo
216 def processResourceEvent(
217 resourceEvent: ResourceEventMsg,
218 userStateModel: UserStateModel,
219 billingMonthInfo: BillingMonthInfo,
220 updateLatestMillis: Boolean
223 val resourceTypeName = resourceEvent.getResource
224 val resourceType = userStateModel.msg.getResourceTypesMap.get(resourceTypeName)
225 if(resourceType eq null) {
226 // Unknown (yet) resource, ignoring event.
230 val chargingBehavior = aquarium.chargingBehaviorOf(resourceType)
231 val resourcesChargingState = userStateModel.msg.getStateOfResources.get(resourceTypeName) match {
233 // First time for this ChargingBehavior.
234 val newState = new ResourcesChargingStateMsg
235 newState.setResource(resourceTypeName)
236 newState.setDetails(chargingBehavior.initialChargingDetails)
237 newState.setStateOfResourceInstance(new JHashMap())
245 val m0 = TimeHelpers.nowMillis()
246 val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
251 resourcesChargingState,
253 msg ⇒ userStateModel.msg.getWalletEntries.add(msg)
255 val m1 = TimeHelpers.nowMillis()
257 if(updateLatestMillis) {
258 userStateModel.msg.setLatestUpdateMillis(m1)
261 userStateModel.updateLatestResourceEventOccurredMillis(resourceEvent.getOccurredMillis)
262 userStateModel.subtractCredits(creditsToSubtract)
267 def processResourceEvents(
268 resourceEvents: Traversable[ResourceEventMsg],
269 userState: UserStateModel,
270 billingMonthInfo: BillingMonthInfo,
271 latestUpdateMillis: Long
275 for(currentResourceEvent ← resourceEvents) {
276 processResourceEvent(
277 currentResourceEvent,
287 userState.msg.setLatestUpdateMillis(latestUpdateMillis)
291 def replayFullMonthBilling(
292 userStateBootstrap: UserStateBootstrap,
293 billingMonthInfo: BillingMonthInfo,
294 defaultResourceTypesMap: Map[String, ResourceType],
295 userStateRecorder: UserStateMsg ⇒ UserStateMsg
296 ): UserStateModel = {
298 replayMonthChargingUpTo(
300 billingMonthInfo.monthStopMillis,
302 defaultResourceTypesMap,
308 * Replays the charging procedure over the set of resource events that happened within the given month and up to
309 * the specified point in time.
311 * @param billingMonthInfo Which month to bill.
312 * @param billingEndTimeMillis Bill from start of month up to (and including) this time.
313 * @param userStateBootstrap
314 * @param resourceTypesMap
315 * @param userStateRecorder
318 def replayMonthChargingUpTo(
319 billingMonthInfo: BillingMonthInfo,
320 billingEndTimeMillis: Long,
321 userStateBootstrap: UserStateBootstrap,
322 resourceTypesMap: Map[String, ResourceType],
323 userStateRecorder: UserStateMsg ⇒ UserStateMsg
324 ): UserStateModel = {
326 val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
327 val userID = userStateBootstrap.userID
329 // In order to replay the full month, we start with the state at the beginning of the month.
330 val previousBillingMonthInfo = billingMonthInfo.previousMonth
331 val userState = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
332 previousBillingMonthInfo,
338 // FIXME the below comments
339 // Keep the working (current) user state. This will get updated as we proceed with billing for the month
340 // specified in the parameters.
341 // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
343 Debug(logger, "workingUserState=%s", userState)
344 Debug(logger, "previousBillingMonthUserState(%s) = %s",
345 previousBillingMonthInfo.toShortDebugString,
349 var _rcEventsCounter = 0
350 resourceEventStore.foreachResourceEventOccurredInPeriod(
352 billingMonthInfo.monthStartMillis, // from start of month
353 billingEndTimeMillis // to requested time
354 ) { currentResourceEvent ⇒
356 Debug(logger, "Processing %s", currentResourceEvent)
358 processResourceEvent(
359 currentResourceEvent,
365 _rcEventsCounter += 1
368 if(_rcEventsCounter > 0) {
369 userState.msg.setLatestUpdateMillis(TimeHelpers.nowMillis())
372 Debug(logger, "Found %s resource events for month %s",
374 billingMonthInfo.toShortDebugString
377 // FIXME Reuse the logic here...Do not erase the comment...
378 /*if(isFullMonthBilling) {
379 // For the remaining events which must contribute an implicit OFF, we collect those OFFs
380 // ... in order to generate an implicit ON later (during the next billing cycle).
381 val (generatorsOfImplicitEnds, theirImplicitEnds) = workingUserState.findAndRemoveGeneratorsOfImplicitEndEvents(
382 aquarium.chargingBehaviorOf(_),
383 billingMonthInfo.monthStopMillis
386 if(generatorsOfImplicitEnds.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
388 Debug(logger, "Process implicitly issued events")
389 DebugSeq(logger, "generatorsOfImplicitEnds", generatorsOfImplicitEnds, 0)
390 DebugSeq(logger, "theirImplicitEnds", theirImplicitEnds, 0)
393 // Now, the previous and implicitly started must be our base for the following computation, so we create an
394 // appropriate worker
395 val specialWorkingUserState = workingUserState.newForImplicitEndsAsPreviousEvents(
396 WorkingUserState.makePreviousResourceEventMap(generatorsOfImplicitEnds)
399 processResourceEvents(
401 specialWorkingUserState,
406 workingUserState.walletEntries ++= specialWorkingUserState.walletEntries
407 workingUserState.totalCredits = specialWorkingUserState.totalCredits