2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.computation
38 import scala.collection.mutable
39 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
40 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
41 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
42 import gr.grnet.aquarium.logic.accounting.Accounting
43 import gr.grnet.aquarium.computation.data._
44 import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, UserStateChangeReason}
45 import gr.grnet.aquarium.event.model.NewWalletEntry
46 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
47 import gr.grnet.aquarium.{Aquarium, AquariumInternalError, AquariumException}
51 * @author Christos KK Loverdos <loverdos@gmail.com>
53 class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
55 protected lazy val aquarium = _aquarium()
56 protected lazy val storeProvider = aquarium.storeProvider
57 protected lazy val accounting = new Accounting {}
58 protected lazy val algorithmCompiler = aquarium.algorithmCompiler
59 protected lazy val policyStore = storeProvider.policyStore
60 protected lazy val userStateStore = storeProvider.userStateStore
61 protected lazy val resourceEventStore = storeProvider.resourceEventStore
63 def findUserStateAtEndOfBillingMonth(userStateBootstrap: UserStateBootstrappingData,
64 billingMonthInfo: BillingMonthInfo,
65 defaultResourcesMap: DSLResourcesMap,
66 calculationReason: UserStateChangeReason,
67 clogOpt: Option[ContextualLogger] = None): UserState = {
69 val clog = ContextualLogger.fromOther(
72 "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
75 def doCompute: UserState = {
84 val userID = userStateBootstrap.userID
85 val userCreationMillis = userStateBootstrap.userCreationMillis
86 val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
87 val billingMonthStartMillis = billingMonthInfo.monthStartMillis
88 val billingMonthStopMillis = billingMonthInfo.monthStopMillis
90 if(billingMonthStopMillis < userCreationMillis) {
91 // If the user did not exist for this billing month, piece of cake
92 clog.debug("User did not exist before %s", userCreationDateCalc)
94 // NOTE: Reason here will be: InitialUserStateSetup$
95 val initialUserState0 = UserState.createInitialUserState(userStateBootstrap)
96 val initialUserState1 = userStateStore.insertUserState(initialUserState0)
98 clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
103 // Ask DB cache for the latest known user state for this billing period
104 val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
106 billingMonthInfo.year,
107 billingMonthInfo.month)
109 latestUserStateOpt match {
111 // Not found, must compute
112 clog.debug("No user state found from cache, will have to (re)compute")
113 val result = doCompute
117 case Some(latestUserState) ⇒
118 // Found a "latest" user state but need to see if it is indeed the true and one latest.
119 // For this reason, we must count the events again.
120 val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
121 val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
123 billingMonthStartMillis,
124 billingMonthStopMillis)
126 val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
130 // NOTE: Keep the caller's calculation reason
131 latestUserState.copyForChangeReason(calculationReason)
133 // We had more, so must recompute
136 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
137 val result = doCompute
143 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
145 throw new AquariumException(errMsg)
152 protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
153 rcEvent.toDebugString(false)
157 def processResourceEvent(startingUserState: UserState,
158 userStateWorker: UserStateWorker,
159 currentResourceEvent: ResourceEventModel,
160 stateChangeReason: UserStateChangeReason,
161 billingMonthInfo: BillingMonthInfo,
162 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
163 clogOpt: Option[ContextualLogger] = None): UserState = {
165 val clog = ContextualLogger.fromOther(clogOpt, logger, "processResourceEvent(%s)", currentResourceEvent.id)
167 var _workingUserState = startingUserState
169 val theResource = currentResourceEvent.safeResource
170 val theInstanceId = currentResourceEvent.safeInstanceId
171 val theValue = currentResourceEvent.value
173 val resourcesMap = userStateWorker.resourcesMap
175 val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
176 clog.begin(currentResourceEventDebugInfo)
178 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
180 // Ignore the event if it is not billable (but still record it in the "previous" stuff).
181 // But to make this decision, first we need the resource definition (and its cost policy).
182 val dslResourceOpt = resourcesMap.findResource(theResource)
183 dslResourceOpt match {
184 // We have a resource (and thus a cost policy)
185 case Some(dslResource) ⇒
186 val costPolicy = dslResource.costPolicy
187 clog.debug("Cost policy %s for %s", costPolicy, dslResource)
188 val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
190 // The resource event is not billable
191 clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
193 // The resource event is billable
194 // Find the previous event.
195 // This is (potentially) needed to calculate new credit amount and new resource instance amount
196 val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
197 clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
199 val havePreviousResourceEvent = previousResourceEventOpt.isDefined
200 val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
201 if(needPreviousResourceEvent && !havePreviousResourceEvent) {
202 // This must be the first resource event of its kind, ever.
203 // TODO: We should normally check the DB to verify the claim (?)
204 clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
205 userStateWorker.updateIgnored(currentResourceEvent)
207 val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
208 val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
209 val oldCredits = _workingUserState.totalCredits
211 // A. Compute new resource instance accumulating amount
212 val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
214 clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
216 // B. Compute new wallet entries
217 clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
218 val alltimeAgreements = _workingUserState.agreementHistory.agreementNamesByTimeslot
220 // clog.debug("Computing full chargeslots")
221 val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
222 previousResourceEventOpt,
223 currentResourceEvent,
235 // We have the chargeslots, let's associate them with the current event
236 if(fullChargeslots.length == 0) {
237 // At least one chargeslot is required.
238 throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
240 clog.debugSeq("fullChargeslots", fullChargeslots, 0)
242 // C. Compute new credit amount (based on the charge slots)
243 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
244 val newCredits = oldCredits - newCreditsDiff
246 if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
247 val newWalletEntry = NewWalletEntry(
248 userStateWorker.userID,
252 TimeHelpers.nowMillis(),
254 billingMonthInfo.year,
255 billingMonthInfo.month,
256 if(havePreviousResourceEvent)
257 List(currentResourceEvent, previousResourceEventOpt.get)
259 List(currentResourceEvent),
262 currentResourceEvent.isSynthetic
264 clog.debug("New %s", newWalletEntry)
266 walletEntriesBuffer += newWalletEntry
268 clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
271 _workingUserState = _workingUserState.copy(
272 totalCredits = newCredits,
273 stateChangeCounter = _workingUserState.stateChangeCounter + 1
278 // After processing, all events billable or not update the previous state
279 userStateWorker.updatePrevious(currentResourceEvent)
281 _workingUserState = _workingUserState.copy(
282 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
285 // We do not have a resource (and thus, no cost policy)
287 // Now, this is a matter of politics: what do we do if no policy was found?
288 clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
289 } // dslResourceOpt match
291 clog.end(currentResourceEventDebugInfo)
296 def processResourceEvents(resourceEvents: Traversable[ResourceEventModel],
297 startingUserState: UserState,
298 userStateWorker: UserStateWorker,
299 stateChangeReason: UserStateChangeReason,
300 billingMonthInfo: BillingMonthInfo,
301 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
302 clogOpt: Option[ContextualLogger] = None): UserState = {
304 var _workingUserState = startingUserState
306 for(currentResourceEvent ← resourceEvents) {
308 _workingUserState = processResourceEvent(
311 currentResourceEvent,
323 def doFullMonthlyBilling(userStateBootstrap: UserStateBootstrappingData,
324 billingMonthInfo: BillingMonthInfo,
325 defaultResourcesMap: DSLResourcesMap,
326 calculationReason: UserStateChangeReason,
327 clogOpt: Option[ContextualLogger] = None): UserState = {
329 val userID = userStateBootstrap.userID
331 val clog = ContextualLogger.fromOther(
334 "doFullMonthlyBilling(%s)", billingMonthInfo.toShortDebugString)
337 val clogSome = Some(clog)
339 val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
341 billingMonthInfo.previousMonth,
343 calculationReason.forBillingMonthInfo(billingMonthInfo.previousMonth),
347 val startingUserState = previousBillingMonthUserState
350 val billingMonthStartMillis = billingMonthInfo.monthStartMillis
351 val billingMonthEndMillis = billingMonthInfo.monthStopMillis
353 // Keep the working (current) user state. This will get updated as we proceed with billing for the month
354 // specified in the parameters.
355 // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
356 var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
358 val userStateWorker = UserStateWorker.fromUserState(_workingUserState, defaultResourcesMap)
360 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
362 // First, find and process the actual resource events from DB
363 val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
365 billingMonthStartMillis,
366 billingMonthEndMillis)
368 val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
370 _workingUserState = processResourceEvents(
371 allResourceEventsForMonth,
380 // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
381 // ... in order to generate an implicit ON later
382 val (specialEvents, theirImplicitEnds) = userStateWorker.
383 findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
384 if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
386 clog.debug("Process implicitly issued events")
387 clog.debugSeq("specialEvents", specialEvents, 0)
388 clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
391 // Now, the previous and implicitly started must be our base for the following computation, so we create an
392 // appropriate worker
393 val specialUserStateWorker = UserStateWorker(
394 userStateWorker.userID,
395 LatestResourceEventsWorker.fromList(specialEvents),
396 ImplicitlyIssuedResourceEventsWorker.Empty,
397 IgnoredFirstResourceEventsWorker.Empty,
398 userStateWorker.resourcesMap
401 _workingUserState = processResourceEvents(
404 specialUserStateWorker,
411 val lastUpdateTime = TimeHelpers.nowMillis()
413 _workingUserState = _workingUserState.copy(
414 implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
415 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
416 stateChangeCounter = _workingUserState.stateChangeCounter + 1,
417 parentUserStateId = startingUserState.idOpt,
418 newWalletEntries = newWalletEntries.toList
421 clog.debug("calculationReason = %s", calculationReason)
423 if(calculationReason.shouldStoreUserState) {
424 val storedUserState = userStateStore.insertUserState(_workingUserState)
425 clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState)
426 _workingUserState = storedUserState
429 clog.debug("RETURN %s", _workingUserState)