2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.charging
38 import gr.grnet.aquarium.computation.BillingMonthInfo
39 import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserStateMsg, ResourceEventMsg}
40 import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, AvroHelpers}
41 import gr.grnet.aquarium.util.LogHelpers.Debug
42 import gr.grnet.aquarium.util.LogHelpers.DebugSeq
43 import gr.grnet.aquarium.util.LogHelpers.Warn
44 import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
45 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
46 import gr.grnet.aquarium.{AquariumInternalError, AquariumAwareSkeleton}
47 import java.util.{Map ⇒ JMap}
48 import gr.grnet.aquarium.event.CreditsModel
49 import gr.grnet.aquarium.charging.state.UserAgreementHistoryModel
53 * @author Christos KK Loverdos <loverdos@gmail.com>
56 final class ChargingService extends AquariumAwareSkeleton with Lifecycle with Loggable {
57 lazy val policyStore = aquarium.policyStore
58 lazy val userStateStore = aquarium.userStateStore
59 lazy val resourceEventStore = aquarium.resourceEventStore
67 def calculateRealtimeUserState(
68 userAgreementHistoryModel: UserAgreementHistoryModel,
69 userStateMsg: UserStateMsg,
70 billingMonthInfo: BillingMonthInfo,
71 resourceMapping: JMap[String, ResourceTypeMsg],
75 import scala.collection.JavaConverters.mapAsScalaMapConverter
77 val stateOfResources = userStateMsg.getStateOfResources.asScala
79 for( (resourceName, workingResourcesState) ← stateOfResources) {
80 resourceMapping.get(resourceName) match {
84 case resourceTypeMsg ⇒
85 val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
86 val stateOfResourceInstance = workingResourcesState.getStateOfResourceInstance.asScala
88 for((resourceInstanceID, resourceInstanceState) ← stateOfResourceInstance) {
89 Debug(logger, "Realtime calculation for %s, %s", resourceName, resourceInstanceID)
90 val virtualEvents = chargingBehavior.createVirtualEventsForRealtimeComputation(
91 userStateMsg.getUserID,
97 DebugSeq(logger, "virtualEvents", virtualEvents, 1)
99 processResourceEvents(
101 userAgreementHistoryModel,
112 def findOrCalculateWorkingUserStateAtEndOfBillingMonth(
113 userAgreementHistoryModel: UserAgreementHistoryModel,
114 billingMonthInfo: BillingMonthInfo,
115 resourceMapping: JMap[String, ResourceTypeMsg],
116 userStateRecorder: UserStateMsg ⇒ UserStateMsg
119 def computeFullMonthBillingAndSaveState(): UserStateMsg = {
120 val fullMonthUserState = replayFullMonthBilling(
121 userAgreementHistoryModel,
127 val monthlyUserState0 = UserStateMsg.newBuilder(fullMonthUserState).
128 setIsFullBillingMonth(true).
129 setBillingYear(billingMonthInfo.year).
130 setBillingMonth(billingMonthInfo.month). // FIXME What about the billingMonthDay?
134 // We always save the state when it is a full month billing
135 val monthlyUserState1 = userStateRecorder.apply(monthlyUserState0)
137 Debug(logger, "Stored full %s %s", billingMonthInfo.toDebugString, AvroHelpers.jsonStringOfSpecificRecord(monthlyUserState1))
142 val userID = userAgreementHistoryModel.userID
143 val userCreationMillis = userAgreementHistoryModel.unsafeUserCreationMillis
144 val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
145 val billingMonthStartMillis = billingMonthInfo.monthStartMillis
146 val billingMonthStopMillis = billingMonthInfo.monthStopMillis
148 if(billingMonthStopMillis < userCreationMillis) {
149 // If the user did not exist for this billing month, piece of cake
150 Debug(logger, "User did not exist before %s", userCreationDateCalc)
152 // TODO: The initial user state might have already been created.
153 // First ask if it exists and compute only if not
154 val initialUserState0 = MessageFactory.newInitialUserStateMsg(
156 CreditsModel.from(0.0),
157 TimeHelpers.nowMillis()
160 Debug(logger, "Created (from bootstrap) initial user state %s", initialUserState0)
162 // We always save the initial state
163 val initialUserState1 = userStateRecorder.apply(initialUserState0)
165 Debug(logger, "Stored initial state = %s", AvroHelpers.jsonStringOfSpecificRecord(initialUserState1))
167 return initialUserState1
170 // Ask DB cache for the latest known user state for this billing period
171 val latestUserStateOpt = userStateStore.findLatestUserStateForFullMonthBilling(
175 latestUserStateOpt match {
177 // Not found, must compute
178 Debug(logger, "No user state found from cache, will have to (re)compute")
179 computeFullMonthBillingAndSaveState
181 case Some(latestUserState) ⇒
182 // Found a "latest" user state but need to see if it is indeed the true and one latest.
183 // For this reason, we must count the events again.
184 val latestStateOOSEventsCounter = latestUserState.getBillingPeriodOutOfSyncResourceEventsCounter
185 val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
187 billingMonthStartMillis,
188 billingMonthStopMillis)
190 val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
194 // NOTE: Keep the caller's calculation reason
197 // We had more, so must recompute
200 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
201 computeFullMonthBillingAndSaveState
205 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
207 throw new AquariumInternalError(errMsg)
212 * Processes one resource event and computes relevant, incremental charges.
214 * @param resourceEvent
215 * @param userStateMsg
216 * @param billingMonthInfo
218 def processResourceEvent(
219 resourceEvent: ResourceEventMsg,
220 userAgreementHistoryModel: UserAgreementHistoryModel,
221 userStateMsg: UserStateMsg,
222 billingMonthInfo: BillingMonthInfo,
223 updateLatestMillis: Boolean,
224 resourceMapping: JMap[String, ResourceTypeMsg]
226 logger.warn("processResourceEvent:workingUserState=%s".format(userStateMsg)) //
227 val resourceName = resourceEvent.getResource
228 val resourceTypeMsg = resourceMapping.get(resourceName)
229 if(resourceTypeMsg eq null) {
230 // Unknown (yet) resource, ignoring event.
234 val chargingBehavior = aquarium.chargingBehaviorOf(resourceTypeMsg)
235 val resourcesChargingState = userStateMsg.getStateOfResources.get(resourceName) match {
237 // First time for this ChargingBehavior.
238 val newState = MessageFactory.newResourcesChargingStateMsg(
240 chargingBehavior.initialChargingDetails
242 userStateMsg.getStateOfResources.put(resourceName, newState)
248 val m0 = TimeHelpers.nowMillis()
249 val (walletEntriesCount, creditsToSubtract) = chargingBehavior.processResourceEvent(
254 resourcesChargingState,
255 userAgreementHistoryModel,
257 msg ⇒ userStateMsg.getWalletEntries.add(msg)
259 val m1 = TimeHelpers.nowMillis()
261 if(updateLatestMillis) {
262 userStateMsg.setLatestUpdateMillis(m1)
265 MessageHelpers.updateLatestResourceEventOccurredMillis(userStateMsg, resourceEvent.getOccurredMillis)
266 MessageHelpers.subtractCredits(userStateMsg, creditsToSubtract)
271 def processResourceEvents(
272 resourceEvents: Traversable[ResourceEventMsg],
273 userAgreementHistoryModel: UserAgreementHistoryModel,
274 userStateMsg: UserStateMsg,
275 billingMonthInfo: BillingMonthInfo,
276 resourceMapping: JMap[String, ResourceTypeMsg],
277 latestUpdateMillis: Long
281 for(currentResourceEvent ← resourceEvents) {
282 processResourceEvent(
283 currentResourceEvent,
284 userAgreementHistoryModel,
295 userStateMsg.setLatestUpdateMillis(latestUpdateMillis)
299 def replayFullMonthBilling(
300 userAgreementHistoryModel: UserAgreementHistoryModel,
301 billingMonthInfo: BillingMonthInfo,
302 resourceMapping: JMap[String, ResourceTypeMsg],
303 userStateRecorder: UserStateMsg ⇒ UserStateMsg
306 replayMonthChargingUpTo(
307 userAgreementHistoryModel,
309 billingMonthInfo.monthStopMillis,
316 * Replays the charging procedure over the set of resource events that happened within the given month and up to
317 * the specified point in time.
319 * @param billingMonthInfo Which month to bill.
320 * @param billingEndTimeMillis Bill from start of month up to (and including) this time.
321 * @param userStateRecorder
324 def replayMonthChargingUpTo(
325 userAgreementHistoryModel: UserAgreementHistoryModel,
326 billingMonthInfo: BillingMonthInfo,
327 billingEndTimeMillis: Long,
328 resourceMapping: JMap[String, ResourceTypeMsg],
329 userStateRecorder: UserStateMsg ⇒ UserStateMsg
332 val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
333 val userID = userAgreementHistoryModel.userID
335 // In order to replay the full month, we start with the state at the beginning of the month.
336 val previousBillingMonthInfo = billingMonthInfo.previousMonth
337 val userStateMsg = findOrCalculateWorkingUserStateAtEndOfBillingMonth(
338 userAgreementHistoryModel,
339 previousBillingMonthInfo,
344 // FIXME the below comments
345 // Keep the working (current) user state. This will get updated as we proceed with billing for the month
346 // specified in the parameters.
347 // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
349 Debug(logger, "workingUserState=%s", userStateMsg)
350 Debug(logger, "previousBillingMonthUserState(%s) = %s",
351 previousBillingMonthInfo.toShortDebugString,
355 var _rcEventsCounter = 0
356 resourceEventStore.foreachResourceEventOccurredInPeriod(
358 billingMonthInfo.monthStartMillis, // from start of month
359 billingEndTimeMillis // to requested time
360 ) { currentResourceEvent ⇒
362 Debug(logger, "Processing %s", currentResourceEvent)
364 processResourceEvent(
365 currentResourceEvent,
366 userAgreementHistoryModel,
373 _rcEventsCounter += 1
376 if(_rcEventsCounter > 0) {
377 userStateMsg.setLatestUpdateMillis(TimeHelpers.nowMillis())
380 Debug(logger, "Found %s resource events for month %s",
382 billingMonthInfo.toShortDebugString
385 // FIXME Reuse the logic here...Do not erase the comment...
386 /*if(isFullMonthBilling) {
387 // For the remaining events which must contribute an implicit OFF, we collect those OFFs
388 // ... in order to generate an implicit ON later (during the next billing cycle).
389 val (generatorsOfImplicitEnds, theirImplicitEnds) = workingUserState.findAndRemoveGeneratorsOfImplicitEndEvents(
390 aquarium.chargingBehaviorOf(_),
391 billingMonthInfo.monthStopMillis
394 if(generatorsOfImplicitEnds.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
396 Debug(logger, "Process implicitly issued events")
397 DebugSeq(logger, "generatorsOfImplicitEnds", generatorsOfImplicitEnds, 0)
398 DebugSeq(logger, "theirImplicitEnds", theirImplicitEnds, 0)
401 // Now, the previous and implicitly started must be our base for the following computation, so we create an
402 // appropriate worker
403 val specialWorkingUserState = workingUserState.newForImplicitEndsAsPreviousEvents(
404 WorkingUserState.makePreviousResourceEventMap(generatorsOfImplicitEnds)
407 processResourceEvents(
409 specialWorkingUserState,
414 workingUserState.walletEntries ++= specialWorkingUserState.walletEntries
415 workingUserState.totalCredits = specialWorkingUserState.totalCredits