2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.computation
38 import scala.collection.mutable
39 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
40 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
41 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
42 import gr.grnet.aquarium.logic.accounting.Accounting
43 import gr.grnet.aquarium.computation.data._
44 import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, UserStateChangeReason}
45 import gr.grnet.aquarium.event.model.NewWalletEntry
46 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
47 import gr.grnet.aquarium.{Aquarium, AquariumInternalError, AquariumException}
51 * @author Christos KK Loverdos <loverdos@gmail.com>
53 class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
55 protected lazy val aquarium = _aquarium()
56 protected lazy val storeProvider = aquarium.storeProvider
57 protected lazy val accounting = new Accounting {}
58 protected lazy val algorithmCompiler = aquarium.algorithmCompiler
59 protected lazy val policyStore = storeProvider.policyStore
60 protected lazy val userStateStore = storeProvider.userStateStore
61 protected lazy val resourceEventStore = storeProvider.resourceEventStore
63 def findUserStateAtEndOfBillingMonth(userId: String,
64 billingMonthInfo: BillingMonthInfo,
65 currentUserState: UserState,
66 defaultResourcesMap: DSLResourcesMap,
67 calculationReason: UserStateChangeReason,
68 clogOpt: Option[ContextualLogger] = None): UserState = {
70 val clog = ContextualLogger.fromOther(
73 "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
76 def doCompute: UserState = {
86 val userCreationMillis = currentUserState.userCreationMillis
87 val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
88 val billingMonthStartMillis = billingMonthInfo.startMillis
89 val billingMonthStopMillis = billingMonthInfo.stopMillis
91 if(billingMonthStopMillis < userCreationMillis) {
92 // If the user did not exist for this billing month, piece of cake
93 clog.debug("User did not exist before %s", userCreationDateCalc)
95 // NOTE: Reason here will be: InitialUserStateSetup$
96 val initialUserState0 = UserState.createInitialUserStateFrom(currentUserState)
97 val initialUserState1 = userStateStore.insertUserState(initialUserState0)
99 clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
104 // Ask DB cache for the latest known user state for this billing period
105 val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
107 billingMonthInfo.year,
108 billingMonthInfo.month)
110 latestUserStateOpt match {
112 // Not found, must compute
113 clog.debug("No user state found from cache, will have to (re)compute")
114 val result = doCompute
118 case Some(latestUserState) ⇒
119 // Found a "latest" user state but need to see if it is indeed the true and one latest.
120 // For this reason, we must count the events again.
121 val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
122 val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
124 billingMonthStartMillis,
125 billingMonthStopMillis)
127 val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
131 // NOTE: Keep the caller's calculation reason
132 latestUserState.copyForChangeReason(calculationReason)
134 // We had more, so must recompute
137 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
138 val result = doCompute
144 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
146 throw new AquariumException(errMsg)
153 def rcDebugInfo(rcEvent: ResourceEventModel) = {
154 rcEvent.toDebugString(false)
159 def processResourceEvent(startingUserState: UserState,
160 userStateWorker: UserStateWorker,
161 currentResourceEvent: ResourceEventModel,
162 stateChangeReason: UserStateChangeReason,
163 billingMonthInfo: BillingMonthInfo,
164 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
165 clogOpt: Option[ContextualLogger] = None): UserState = {
167 val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
169 var _workingUserState = startingUserState
171 val theResource = currentResourceEvent.safeResource
172 val theInstanceId = currentResourceEvent.safeInstanceId
173 val theValue = currentResourceEvent.value
175 val resourcesMap = userStateWorker.resourcesMap
177 val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
178 clog.begin(currentResourceEventDebugInfo)
180 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
182 // Ignore the event if it is not billable (but still record it in the "previous" stuff).
183 // But to make this decision, first we need the resource definition (and its cost policy).
184 val dslResourceOpt = resourcesMap.findResource(theResource)
185 dslResourceOpt match {
186 // We have a resource (and thus a cost policy)
187 case Some(dslResource) ⇒
188 val costPolicy = dslResource.costPolicy
189 clog.debug("Cost policy %s for %s", costPolicy, dslResource)
190 val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
192 // The resource event is not billable
193 clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
195 // The resource event is billable
196 // Find the previous event.
197 // This is (potentially) needed to calculate new credit amount and new resource instance amount
198 val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
199 clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
201 val havePreviousResourceEvent = previousResourceEventOpt.isDefined
202 val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
203 if(needPreviousResourceEvent && !havePreviousResourceEvent) {
204 // This must be the first resource event of its kind, ever.
205 // TODO: We should normally check the DB to verify the claim (?)
206 clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
207 userStateWorker.updateIgnored(currentResourceEvent)
209 val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
210 val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
211 val oldCredits = _workingUserState.totalCredits
213 // A. Compute new resource instance accumulating amount
214 val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
216 clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
218 // B. Compute new wallet entries
219 clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
220 val alltimeAgreements = _workingUserState.agreementHistory.agreementNamesByTimeslot
222 // clog.debug("Computing full chargeslots")
223 val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
224 previousResourceEventOpt,
225 currentResourceEvent,
237 // We have the chargeslots, let's associate them with the current event
238 if(fullChargeslots.length == 0) {
239 // At least one chargeslot is required.
240 throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
242 clog.debugSeq("fullChargeslots", fullChargeslots, 0)
244 // C. Compute new credit amount (based on the charge slots)
245 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
246 val newCredits = oldCredits - newCreditsDiff
248 if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
249 val newWalletEntry = NewWalletEntry(
250 userStateWorker.userID,
254 TimeHelpers.nowMillis(),
256 billingMonthInfo.year,
257 billingMonthInfo.month,
258 if(havePreviousResourceEvent)
259 List(currentResourceEvent, previousResourceEventOpt.get)
261 List(currentResourceEvent),
264 currentResourceEvent.isSynthetic
266 clog.debug("New %s", newWalletEntry)
268 walletEntriesBuffer += newWalletEntry
270 clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
273 _workingUserState = _workingUserState.copy(
274 totalCredits = newCredits,
275 stateChangeCounter = _workingUserState.stateChangeCounter + 1
280 // After processing, all events billable or not update the previous state
281 userStateWorker.updatePrevious(currentResourceEvent)
283 _workingUserState = _workingUserState.copy(
284 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
287 // We do not have a resource (and thus, no cost policy)
289 // Now, this is a matter of politics: what do we do if no policy was found?
290 clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
291 } // dslResourceOpt match
293 clog.end(currentResourceEventDebugInfo)
298 def processResourceEvents(resourceEvents: Traversable[ResourceEventModel],
299 startingUserState: UserState,
300 userStateWorker: UserStateWorker,
301 stateChangeReason: UserStateChangeReason,
302 billingMonthInfo: BillingMonthInfo,
303 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
304 clogOpt: Option[ContextualLogger] = None): UserState = {
306 var _workingUserState = startingUserState
308 for(currentResourceEvent ← resourceEvents) {
310 _workingUserState = processResourceEvent(
313 currentResourceEvent,
325 def doFullMonthlyBilling(userId: String,
326 billingMonthInfo: BillingMonthInfo,
327 currentUserState: UserState,
328 defaultResourcesMap: DSLResourcesMap,
329 calculationReason: UserStateChangeReason = NoSpecificChangeReason,
330 clogOpt: Option[ContextualLogger] = None): UserState = {
333 val clog = ContextualLogger.fromOther(
336 "doFullMonthlyBilling(%s)", billingMonthInfo)
339 val clogSome = Some(clog)
341 val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
343 billingMonthInfo.previousMonth,
346 calculationReason.forPreviousBillingMonth,
350 val startingUserState = previousBillingMonthUserState
353 val billingMonthStartMillis = billingMonthInfo.startMillis
354 val billingMonthEndMillis = billingMonthInfo.stopMillis
356 // Keep the working (current) user state. This will get updated as we proceed with billing for the month
357 // specified in the parameters.
358 // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
359 var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
361 val userStateWorker = UserStateWorker.fromUserState(_workingUserState, defaultResourcesMap)
363 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
365 // First, find and process the actual resource events from DB
366 val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
368 billingMonthStartMillis,
369 billingMonthEndMillis)
371 val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
373 _workingUserState = processResourceEvents(
374 allResourceEventsForMonth,
383 // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
384 // ... in order to generate an implicit ON later
385 val (specialEvents, theirImplicitEnds) = userStateWorker.
386 findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
387 if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
389 clog.debug("Process implicitly issued events")
390 clog.debugSeq("specialEvents", specialEvents, 0)
391 clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
394 // Now, the previous and implicitly started must be our base for the following computation, so we create an
395 // appropriate worker
396 val specialUserStateWorker = UserStateWorker(
397 userStateWorker.userID,
398 LatestResourceEventsWorker.fromList(specialEvents),
399 ImplicitlyIssuedResourceEventsWorker.Empty,
400 IgnoredFirstResourceEventsWorker.Empty,
401 userStateWorker.resourcesMap
404 _workingUserState = processResourceEvents(
407 specialUserStateWorker,
414 val lastUpdateTime = TimeHelpers.nowMillis()
416 _workingUserState = _workingUserState.copy(
417 implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
418 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
419 stateChangeCounter = _workingUserState.stateChangeCounter + 1,
420 parentUserStateId = startingUserState.idOpt,
421 newWalletEntries = newWalletEntries.toList
424 clog.debug("calculationReason = %s", calculationReason)
426 if(calculationReason.shouldStoreUserState) {
427 val storedUserState = userStateStore.insertUserState(_workingUserState)
428 clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState)
429 _workingUserState = storedUserState
432 clog.debug("RETURN %s", _workingUserState)