2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.computation
38 import scala.collection.mutable
39 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
40 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
41 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
42 import gr.grnet.aquarium.computation.state.parts._
43 import gr.grnet.aquarium.event.model.NewWalletEntry
44 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
45 import gr.grnet.aquarium.{Aquarium, AquariumInternalError}
46 import gr.grnet.aquarium.computation.reason.{MonthlyBillingCalculation, InitialUserStateSetup, UserStateChangeReason}
47 import gr.grnet.aquarium.computation.state.{UserStateWorker, UserStateBootstrap, UserState}
51 * @author Christos KK Loverdos <loverdos@gmail.com>
53 final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
55 lazy val aquarium = _aquarium
57 lazy val storeProvider = aquarium.storeProvider
58 lazy val timeslotComputations = new TimeslotComputations {}
59 lazy val algorithmCompiler = aquarium.algorithmCompiler
60 lazy val policyStore = storeProvider.policyStore
61 lazy val userStateStoreForRead = storeProvider.userStateStore
62 lazy val resourceEventStore = storeProvider.resourceEventStore
64 def findUserStateAtEndOfBillingMonth(
65 userStateBootstrap: UserStateBootstrap,
66 billingMonthInfo: BillingMonthInfo,
67 defaultResourcesMap: DSLResourcesMap,
68 calculationReason: UserStateChangeReason,
69 storeFunc: UserState ⇒ UserState,
70 clogOpt: Option[ContextualLogger] = None
73 val clog = ContextualLogger.fromOther(
76 "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
79 def computeFullMonthBillingAndSaveState(): UserState = {
80 val userState0 = doFullMonthBilling(
89 // We always save the state when it is a full month billing
90 val userState1 = storeFunc.apply(
91 userState0.newWithChangeReason(
92 MonthlyBillingCalculation(calculationReason, billingMonthInfo))
95 clog.debug("Stored full %s %s", billingMonthInfo.toDebugString, userState1.toJsonString)
99 val userID = userStateBootstrap.userID
100 val userCreationMillis = userStateBootstrap.userCreationMillis
101 val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
102 val billingMonthStartMillis = billingMonthInfo.monthStartMillis
103 val billingMonthStopMillis = billingMonthInfo.monthStopMillis
105 if(billingMonthStopMillis < userCreationMillis) {
106 // If the user did not exist for this billing month, piece of cake
107 clog.debug("User did not exist before %s", userCreationDateCalc)
109 // TODO: The initial user state might have already been created.
110 // First ask if it exists and compute only if not
111 val initialUserState0 = UserState.createInitialUserStateFromBootstrap(
113 TimeHelpers.nowMillis(),
114 InitialUserStateSetup(Some(calculationReason)) // we record the originating calculation reason
117 // We always save the initial state
118 val initialUserState1 = storeFunc.apply(initialUserState0)
120 clog.debug("Stored initial state = %s", initialUserState1.toJsonString)
123 return initialUserState1
126 // Ask DB cache for the latest known user state for this billing period
127 val latestUserStateOpt = userStateStoreForRead.findLatestUserStateForFullMonthBilling(
131 latestUserStateOpt match {
133 // Not found, must compute
134 clog.debug("No user state found from cache, will have to (re)compute")
135 val result = computeFullMonthBillingAndSaveState
139 case Some(latestUserState) ⇒
140 // Found a "latest" user state but need to see if it is indeed the true and one latest.
141 // For this reason, we must count the events again.
142 val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
143 val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
145 billingMonthStartMillis,
146 billingMonthStopMillis)
148 val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
152 // NOTE: Keep the caller's calculation reason
153 val result = latestUserState.newWithChangeReason(calculationReason)
157 // We had more, so must recompute
160 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
161 val result = computeFullMonthBillingAndSaveState
167 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
169 throw new AquariumInternalError(errMsg)
175 protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
176 rcEvent.toDebugString
180 def processResourceEvent(
181 startingUserState: UserState,
182 userStateWorker: UserStateWorker,
183 currentResourceEvent: ResourceEventModel,
184 stateChangeReason: UserStateChangeReason,
185 billingMonthInfo: BillingMonthInfo,
186 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
187 clogOpt: Option[ContextualLogger] = None
190 val clog = ContextualLogger.fromOther(clogOpt, logger, "processResourceEvent(%s)", currentResourceEvent.id)
192 var _workingUserState = startingUserState
194 val theResource = currentResourceEvent.safeResource
195 val theInstanceId = currentResourceEvent.safeInstanceId
196 val theValue = currentResourceEvent.value
198 val resourcesMap = userStateWorker.resourcesMap
200 val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
201 clog.begin(currentResourceEventDebugInfo)
203 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
205 // Ignore the event if it is not billable (but still record it in the "previous" stuff).
206 // But to make this decision, first we need the resource definition (and its cost policy).
207 val dslResourceOpt = resourcesMap.findResource(theResource)
208 dslResourceOpt match {
209 // We have a resource (and thus a cost policy)
210 case Some(dslResource) ⇒
211 val costPolicy = dslResource.costPolicy
212 clog.debug("Cost policy %s for %s", costPolicy, dslResource)
213 val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
215 // The resource event is not billable
216 clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
218 // The resource event is billable
219 // Find the previous event.
220 // This is (potentially) needed to calculate new credit amount and new resource instance amount
221 val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
222 clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
224 val havePreviousResourceEvent = previousResourceEventOpt.isDefined
225 val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
226 if(needPreviousResourceEvent && !havePreviousResourceEvent) {
227 // This must be the first resource event of its kind, ever.
228 // TODO: We should normally check the DB to verify the claim (?)
229 clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
230 userStateWorker.updateIgnored(currentResourceEvent)
232 val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
233 val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
234 val oldCredits = _workingUserState.totalCredits
236 // A. Compute new resource instance accumulating amount
237 val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
239 clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
241 // B. Compute new wallet entries
242 clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
243 val alltimeAgreements = _workingUserState.agreementHistory.agreementNamesByTimeslot
245 // clog.debug("Computing full chargeslots")
246 val (referenceTimeslot, fullChargeslots) = timeslotComputations.computeFullChargeslots(
247 previousResourceEventOpt,
248 currentResourceEvent,
260 // We have the chargeslots, let's associate them with the current event
261 if(fullChargeslots.length == 0) {
262 // At least one chargeslot is required.
263 throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
265 clog.debugSeq("fullChargeslots", fullChargeslots, 0)
267 // C. Compute new credit amount (based on the charge slots)
268 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
269 val newCredits = oldCredits - newCreditsDiff
271 if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
272 val newWalletEntry = NewWalletEntry(
273 userStateWorker.userID,
277 TimeHelpers.nowMillis(),
279 billingMonthInfo.year,
280 billingMonthInfo.month,
281 if(havePreviousResourceEvent)
282 List(currentResourceEvent, previousResourceEventOpt.get)
284 List(currentResourceEvent),
287 currentResourceEvent.isSynthetic
289 clog.debug("New %s", newWalletEntry)
291 walletEntriesBuffer += newWalletEntry
293 clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
296 _workingUserState = _workingUserState.copy(
297 totalCredits = newCredits,
298 stateChangeCounter = _workingUserState.stateChangeCounter + 1
303 // After processing, all events billable or not update the previous state
304 userStateWorker.updatePrevious(currentResourceEvent)
306 _workingUserState = _workingUserState.copy(
307 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
310 // We do not have a resource (and thus, no cost policy)
312 // Now, this is a matter of politics: what do we do if no policy was found?
313 clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
314 } // dslResourceOpt match
316 clog.end(currentResourceEventDebugInfo)
321 def processResourceEvents(
322 resourceEvents: Traversable[ResourceEventModel],
323 startingUserState: UserState,
324 userStateWorker: UserStateWorker,
325 stateChangeReason: UserStateChangeReason,
326 billingMonthInfo: BillingMonthInfo,
327 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
328 clogOpt: Option[ContextualLogger] = None
331 var _workingUserState = startingUserState
333 for(currentResourceEvent ← resourceEvents) {
335 _workingUserState = processResourceEvent(
338 currentResourceEvent,
349 def doFullMonthBilling(
350 userStateBootstrap: UserStateBootstrap,
351 billingMonthInfo: BillingMonthInfo,
352 defaultResourcesMap: DSLResourcesMap,
353 calculationReason: UserStateChangeReason,
354 storeFunc: UserState ⇒ UserState,
355 clogOpt: Option[ContextualLogger] = None
360 billingMonthInfo.monthStopMillis,
369 def doMonthBillingUpTo(
371 * Which month to bill.
373 billingMonthInfo: BillingMonthInfo,
375 * Bill from start of month up to (and including) this time.
377 billingEndTimeMillis: Long,
378 userStateBootstrap: UserStateBootstrap,
379 defaultResourcesMap: DSLResourcesMap,
380 calculationReason: UserStateChangeReason,
381 storeFunc: UserState ⇒ UserState,
382 clogOpt: Option[ContextualLogger] = None
385 val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
386 val userID = userStateBootstrap.userID
388 val clog = ContextualLogger.fromOther(
391 "doMonthBillingUpTo(%s)", new MutableDateCalc(billingEndTimeMillis).toYYYYMMDDHHMMSSSSS)
394 clog.debug("calculationReason = %s", calculationReason)
396 val clogSome = Some(clog)
398 val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
400 billingMonthInfo.previousMonth,
407 val startingUserState = previousBillingMonthUserState
409 // Keep the working (current) user state. This will get updated as we proceed with billing for the month
410 // specified in the parameters.
411 // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
412 var _workingUserState = startingUserState.newWithChangeReason(calculationReason)
414 val userStateWorker = UserStateWorker.fromUserState(_workingUserState, defaultResourcesMap)
416 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
418 // First, find and process the actual resource events from DB
419 val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
421 billingMonthInfo.monthStartMillis, // from start of month
422 billingEndTimeMillis // to requested time
425 val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
427 _workingUserState = processResourceEvents(
428 allResourceEventsForMonth,
437 if(isFullMonthBilling) {
438 // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
439 // ... in order to generate an implicit ON later (during the next billing cycle).
440 val (specialEvents, theirImplicitEnds) = userStateWorker.
441 findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthInfo.monthStopMillis)
443 if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
445 clog.debug("Process implicitly issued events")
446 clog.debugSeq("specialEvents", specialEvents, 0)
447 clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
450 // Now, the previous and implicitly started must be our base for the following computation, so we create an
451 // appropriate worker
452 val specialUserStateWorker = UserStateWorker(
453 userStateWorker.userID,
454 LatestResourceEventsWorker.fromList(specialEvents),
455 ImplicitlyIssuedResourceEventsWorker.Empty,
456 IgnoredFirstResourceEventsWorker.Empty,
457 userStateWorker.resourcesMap
460 _workingUserState = processResourceEvents(
463 specialUserStateWorker,
471 val lastUpdateTime = TimeHelpers.nowMillis()
473 _workingUserState = _workingUserState.copy(
474 isFullBillingMonthState = isFullMonthBilling,
476 theFullBillingMonth = if(isFullMonthBilling)
477 Some(billingMonthInfo)
479 _workingUserState.theFullBillingMonth,
481 implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
483 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
485 stateChangeCounter = _workingUserState.stateChangeCounter + 1,
487 parentUserStateIDInStore = startingUserState.idInStore,
489 newWalletEntries = newWalletEntries.toList