2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.computation
38 import scala.collection.mutable
39 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
40 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
41 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
42 import gr.grnet.aquarium.computation.state.parts._
43 import gr.grnet.aquarium.event.model.NewWalletEntry
44 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
45 import gr.grnet.aquarium.{Aquarium, AquariumInternalError}
46 import gr.grnet.aquarium.computation.reason.{MonthlyBillingCalculation, InitialUserStateSetup, UserStateChangeReason}
47 import gr.grnet.aquarium.computation.state.{UserStateWorker, UserStateBootstrap, UserState}
51 * @author Christos KK Loverdos <loverdos@gmail.com>
53 final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
55 lazy val aquarium = _aquarium
57 lazy val storeProvider = aquarium.storeProvider
58 lazy val timeslotComputations = new TimeslotComputations {}
59 lazy val algorithmCompiler = aquarium.algorithmCompiler
60 lazy val policyStore = storeProvider.policyStore
61 lazy val userStateStoreForRead = storeProvider.userStateStore
62 lazy val resourceEventStore = storeProvider.resourceEventStore
64 def findUserStateAtEndOfBillingMonth(
65 userStateBootstrap: UserStateBootstrap,
66 billingMonthInfo: BillingMonthInfo,
67 defaultResourcesMap: DSLResourcesMap,
68 calculationReason: UserStateChangeReason,
69 storeFunc: UserState ⇒ UserState,
70 clogOpt: Option[ContextualLogger] = None
73 val clog = ContextualLogger.fromOther(
76 "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
79 def computeFullMonthBillingAndSaveState(): UserState = {
80 val userState0 = doFullMonthBilling(
89 // We always save the state when it is a full month billing
90 val userState1 = storeFunc.apply(
91 userState0.newWithChangeReason(
92 MonthlyBillingCalculation(calculationReason, billingMonthInfo))
95 clog.debug("Stored full %s %s", billingMonthInfo.toDebugString, userState1.toJsonString)
99 val userID = userStateBootstrap.userID
100 val userCreationMillis = userStateBootstrap.userCreationMillis
101 val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
102 val billingMonthStartMillis = billingMonthInfo.monthStartMillis
103 val billingMonthStopMillis = billingMonthInfo.monthStopMillis
105 if(billingMonthStopMillis < userCreationMillis) {
106 // If the user did not exist for this billing month, piece of cake
107 clog.debug("User did not exist before %s", userCreationDateCalc)
109 // TODO: The initial user state might have already been created.
110 // First ask if it exists and compute only if not
111 val initialUserState0 = UserState.createInitialUserStateFromBootstrap(
113 TimeHelpers.nowMillis(),
114 InitialUserStateSetup(Some(calculationReason)) // we record the originating calculation reason
117 // We always save the initial state
118 val initialUserState1 = storeFunc.apply(initialUserState0)
120 clog.debug("Stored initial state = %s", initialUserState1.toJsonString)
123 return initialUserState1
126 // Ask DB cache for the latest known user state for this billing period
127 val latestUserStateOpt = userStateStoreForRead.findLatestUserStateForFullMonthBilling(
131 latestUserStateOpt match {
133 // Not found, must compute
134 clog.debug("No user state found from cache, will have to (re)compute")
135 val result = computeFullMonthBillingAndSaveState
139 case Some(latestUserState) ⇒
140 // Found a "latest" user state but need to see if it is indeed the true and one latest.
141 // For this reason, we must count the events again.
142 val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
143 val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
145 billingMonthStartMillis,
146 billingMonthStopMillis)
148 val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
152 // NOTE: Keep the caller's calculation reason
153 latestUserState.newWithChangeReason(calculationReason)
155 // We had more, so must recompute
158 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
159 val result = computeFullMonthBillingAndSaveState
165 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
167 throw new AquariumInternalError(errMsg)
173 protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
174 rcEvent.toDebugString
178 def processResourceEvent(
179 startingUserState: UserState,
180 userStateWorker: UserStateWorker,
181 currentResourceEvent: ResourceEventModel,
182 stateChangeReason: UserStateChangeReason,
183 billingMonthInfo: BillingMonthInfo,
184 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
185 clogOpt: Option[ContextualLogger] = None
188 val clog = ContextualLogger.fromOther(clogOpt, logger, "processResourceEvent(%s)", currentResourceEvent.id)
190 var _workingUserState = startingUserState
192 val theResource = currentResourceEvent.safeResource
193 val theInstanceId = currentResourceEvent.safeInstanceId
194 val theValue = currentResourceEvent.value
196 val resourcesMap = userStateWorker.resourcesMap
198 val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
199 clog.begin(currentResourceEventDebugInfo)
201 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
203 // Ignore the event if it is not billable (but still record it in the "previous" stuff).
204 // But to make this decision, first we need the resource definition (and its cost policy).
205 val dslResourceOpt = resourcesMap.findResource(theResource)
206 dslResourceOpt match {
207 // We have a resource (and thus a cost policy)
208 case Some(dslResource) ⇒
209 val costPolicy = dslResource.costPolicy
210 clog.debug("Cost policy %s for %s", costPolicy, dslResource)
211 val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
213 // The resource event is not billable
214 clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
216 // The resource event is billable
217 // Find the previous event.
218 // This is (potentially) needed to calculate new credit amount and new resource instance amount
219 val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
220 clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
222 val havePreviousResourceEvent = previousResourceEventOpt.isDefined
223 val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
224 if(needPreviousResourceEvent && !havePreviousResourceEvent) {
225 // This must be the first resource event of its kind, ever.
226 // TODO: We should normally check the DB to verify the claim (?)
227 clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
228 userStateWorker.updateIgnored(currentResourceEvent)
230 val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
231 val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
232 val oldCredits = _workingUserState.totalCredits
234 // A. Compute new resource instance accumulating amount
235 val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
237 clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
239 // B. Compute new wallet entries
240 clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
241 val alltimeAgreements = _workingUserState.agreementHistory.agreementNamesByTimeslot
243 // clog.debug("Computing full chargeslots")
244 val (referenceTimeslot, fullChargeslots) = timeslotComputations.computeFullChargeslots(
245 previousResourceEventOpt,
246 currentResourceEvent,
258 // We have the chargeslots, let's associate them with the current event
259 if(fullChargeslots.length == 0) {
260 // At least one chargeslot is required.
261 throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
263 clog.debugSeq("fullChargeslots", fullChargeslots, 0)
265 // C. Compute new credit amount (based on the charge slots)
266 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
267 val newCredits = oldCredits - newCreditsDiff
269 if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
270 val newWalletEntry = NewWalletEntry(
271 userStateWorker.userID,
275 TimeHelpers.nowMillis(),
277 billingMonthInfo.year,
278 billingMonthInfo.month,
279 if(havePreviousResourceEvent)
280 List(currentResourceEvent, previousResourceEventOpt.get)
282 List(currentResourceEvent),
285 currentResourceEvent.isSynthetic
287 clog.debug("New %s", newWalletEntry)
289 walletEntriesBuffer += newWalletEntry
291 clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
294 _workingUserState = _workingUserState.copy(
295 totalCredits = newCredits,
296 stateChangeCounter = _workingUserState.stateChangeCounter + 1
301 // After processing, all events billable or not update the previous state
302 userStateWorker.updatePrevious(currentResourceEvent)
304 _workingUserState = _workingUserState.copy(
305 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
308 // We do not have a resource (and thus, no cost policy)
310 // Now, this is a matter of politics: what do we do if no policy was found?
311 clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
312 } // dslResourceOpt match
314 clog.end(currentResourceEventDebugInfo)
319 def processResourceEvents(
320 resourceEvents: Traversable[ResourceEventModel],
321 startingUserState: UserState,
322 userStateWorker: UserStateWorker,
323 stateChangeReason: UserStateChangeReason,
324 billingMonthInfo: BillingMonthInfo,
325 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
326 clogOpt: Option[ContextualLogger] = None
329 var _workingUserState = startingUserState
331 for(currentResourceEvent ← resourceEvents) {
333 _workingUserState = processResourceEvent(
336 currentResourceEvent,
347 def doFullMonthBilling(
348 userStateBootstrap: UserStateBootstrap,
349 billingMonthInfo: BillingMonthInfo,
350 defaultResourcesMap: DSLResourcesMap,
351 calculationReason: UserStateChangeReason,
352 storeFunc: UserState ⇒ UserState,
353 clogOpt: Option[ContextualLogger] = None
358 billingMonthInfo.monthStopMillis,
367 def doMonthBillingUpTo(
369 * Which month to bill.
371 billingMonthInfo: BillingMonthInfo,
373 * Bill from start of month up to (and including) this time.
375 billingEndTimeMillis: Long,
376 userStateBootstrap: UserStateBootstrap,
377 defaultResourcesMap: DSLResourcesMap,
378 calculationReason: UserStateChangeReason,
379 storeFunc: UserState ⇒ UserState,
380 clogOpt: Option[ContextualLogger] = None
383 val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
384 val userID = userStateBootstrap.userID
386 val clog = ContextualLogger.fromOther(
389 "doMonthBillingUpTo(%s)", new MutableDateCalc(billingEndTimeMillis).toYYYYMMDDHHMMSSSSS)
392 clog.debug("calculationReason = %s", calculationReason)
394 val clogSome = Some(clog)
396 val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
398 billingMonthInfo.previousMonth,
405 val startingUserState = previousBillingMonthUserState
407 // Keep the working (current) user state. This will get updated as we proceed with billing for the month
408 // specified in the parameters.
409 // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
410 var _workingUserState = startingUserState.newWithChangeReason(calculationReason)
412 val userStateWorker = UserStateWorker.fromUserState(_workingUserState, defaultResourcesMap)
414 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
416 // First, find and process the actual resource events from DB
417 val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
419 billingMonthInfo.monthStartMillis, // from start of month
420 billingEndTimeMillis // to requested time
423 val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
425 _workingUserState = processResourceEvents(
426 allResourceEventsForMonth,
435 if(isFullMonthBilling) {
436 // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
437 // ... in order to generate an implicit ON later (during the next billing cycle).
438 val (specialEvents, theirImplicitEnds) = userStateWorker.
439 findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthInfo.monthStopMillis)
441 if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
443 clog.debug("Process implicitly issued events")
444 clog.debugSeq("specialEvents", specialEvents, 0)
445 clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
448 // Now, the previous and implicitly started must be our base for the following computation, so we create an
449 // appropriate worker
450 val specialUserStateWorker = UserStateWorker(
451 userStateWorker.userID,
452 LatestResourceEventsWorker.fromList(specialEvents),
453 ImplicitlyIssuedResourceEventsWorker.Empty,
454 IgnoredFirstResourceEventsWorker.Empty,
455 userStateWorker.resourcesMap
458 _workingUserState = processResourceEvents(
461 specialUserStateWorker,
469 val lastUpdateTime = TimeHelpers.nowMillis()
471 _workingUserState = _workingUserState.copy(
472 isFullBillingMonthState = isFullMonthBilling,
474 theFullBillingMonth = if(isFullMonthBilling)
475 Some(billingMonthInfo)
477 _workingUserState.theFullBillingMonth,
479 implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
481 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
483 stateChangeCounter = _workingUserState.stateChangeCounter + 1,
485 parentUserStateIDInStore = startingUserState.idInStore,
487 newWalletEntries = newWalletEntries.toList