2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.computation
38 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
39 import gr.grnet.aquarium.util.shortClassNameOf
40 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
41 import gr.grnet.aquarium.computation.state.parts._
42 import gr.grnet.aquarium.event.model.NewWalletEntry
43 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
44 import gr.grnet.aquarium.{AquariumAwareSkeleton, AquariumInternalError}
45 import gr.grnet.aquarium.computation.reason.{MonthlyBillingCalculation, InitialUserStateSetup, UserStateChangeReason}
46 import gr.grnet.aquarium.computation.state.{UserStateWorker, UserStateBootstrap, UserState}
47 import gr.grnet.aquarium.policy.ResourceType
51 * @author Christos KK Loverdos <loverdos@gmail.com>
53 final class UserStateComputations extends AquariumAwareSkeleton with Loggable {
54 lazy val timeslotComputations = new TimeslotComputations {} // FIXME
55 lazy val policyStore = aquarium.policyStore
56 lazy val userStateStoreForRead = aquarium.userStateStore
57 lazy val resourceEventStore = aquarium.resourceEventStore
59 def findUserStateAtEndOfBillingMonth(
60 userStateBootstrap: UserStateBootstrap,
61 billingMonthInfo: BillingMonthInfo,
62 defaultResourceTypesMap: Map[String, ResourceType],
63 calculationReason: UserStateChangeReason,
64 storeFunc: UserState ⇒ UserState,
65 clogOpt: Option[ContextualLogger] = None
68 val clog = ContextualLogger.fromOther(
71 "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
74 def computeFullMonthBillingAndSaveState(): UserState = {
75 val userState0 = doFullMonthBilling(
78 defaultResourceTypesMap,
84 // We always save the state when it is a full month billing
85 val userState1 = storeFunc.apply(
86 userState0.newWithChangeReason(
87 MonthlyBillingCalculation(calculationReason, billingMonthInfo))
90 clog.debug("Stored full %s %s", billingMonthInfo.toDebugString, userState1.toJsonString)
94 val userID = userStateBootstrap.userID
95 val userCreationMillis = userStateBootstrap.userCreationMillis
96 val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
97 val billingMonthStartMillis = billingMonthInfo.monthStartMillis
98 val billingMonthStopMillis = billingMonthInfo.monthStopMillis
100 if(billingMonthStopMillis < userCreationMillis) {
101 // If the user did not exist for this billing month, piece of cake
102 clog.debug("User did not exist before %s", userCreationDateCalc)
104 // TODO: The initial user state might have already been created.
105 // First ask if it exists and compute only if not
106 val initialUserState0 = UserState.createInitialUserStateFromBootstrap(
108 TimeHelpers.nowMillis(),
109 InitialUserStateSetup(Some(calculationReason)) // we record the originating calculation reason
112 // We always save the initial state
113 val initialUserState1 = storeFunc.apply(initialUserState0)
115 clog.debug("Stored initial state = %s", initialUserState1.toJsonString)
118 return initialUserState1
121 // Ask DB cache for the latest known user state for this billing period
122 val latestUserStateOpt = userStateStoreForRead.findLatestUserStateForFullMonthBilling(
126 latestUserStateOpt match {
128 // Not found, must compute
129 clog.debug("No user state found from cache, will have to (re)compute")
130 val result = computeFullMonthBillingAndSaveState
134 case Some(latestUserState) ⇒
135 // Found a "latest" user state but need to see if it is indeed the true and one latest.
136 // For this reason, we must count the events again.
137 val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
138 val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
140 billingMonthStartMillis,
141 billingMonthStopMillis)
143 val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
147 // NOTE: Keep the caller's calculation reason
148 val result = latestUserState.newWithChangeReason(calculationReason)
152 // We had more, so must recompute
155 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
156 val result = computeFullMonthBillingAndSaveState
162 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
164 throw new AquariumInternalError(errMsg)
170 protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
171 rcEvent.toDebugString
175 def processResourceEvent(
176 startingUserState: UserState,
177 userStateWorker: UserStateWorker,
178 currentResourceEvent: ResourceEventModel,
179 stateChangeReason: UserStateChangeReason,
180 billingMonthInfo: BillingMonthInfo,
181 walletEntryRecorder: NewWalletEntry ⇒ Unit,
182 clogOpt: Option[ContextualLogger] = None
185 val clog = ContextualLogger.fromOther(clogOpt, logger, "processResourceEvent(%s)", currentResourceEvent.id)
187 var _workingUserState = startingUserState
189 val theResource = currentResourceEvent.safeResource
190 val theInstanceId = currentResourceEvent.safeInstanceId
191 val theValue = currentResourceEvent.value
192 val theDetails = currentResourceEvent.details
194 val resourceTypesMap = userStateWorker.resourceTypesMap
196 val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
197 clog.begin(currentResourceEventDebugInfo)
199 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
201 // Ignore the event if it is not billable (but still record it in the "previous" stuff).
202 // But to make this decision, first we need the resource type (and its charging behavior).
203 resourceTypesMap.get(theResource) match {
204 // We have a resource type (and thus a charging behavior)
205 case Some(resourceType) ⇒
206 val chargingBehavior = resourceType.chargingBehavior
207 clog.debug("%s for %s", chargingBehavior, resourceType)
208 val isBillable = chargingBehavior.isBillableEvent(currentResourceEvent)
210 // The resource event is not billable
211 clog.debug("Ignoring not billable %s", currentResourceEventDebugInfo)
213 // The resource event is billable
214 // Find the previous event.
215 // This is (potentially) needed to calculate new credit amount and new resource instance amount
216 val previousResourceEventOpt0 = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
217 clog.debug("PreviousM %s", previousResourceEventOpt0.map(rcDebugInfo(_)))
219 val havePreviousResourceEvent = previousResourceEventOpt0.isDefined
220 val needPreviousResourceEvent = chargingBehavior.needsPreviousEventForCreditAndAmountCalculation
222 val (proceed, previousResourceEventOpt1) = if(needPreviousResourceEvent && !havePreviousResourceEvent) {
223 // This must be the first resource event of its kind, ever.
224 // TODO: We should normally check the DB to verify the claim (?)
226 val actualFirstEvent = currentResourceEvent
228 if(chargingBehavior.isBillableFirstEvent(actualFirstEvent) &&
229 chargingBehavior.mustGenerateDummyFirstEvent) {
231 clog.debug("First event of its kind %s", currentResourceEventDebugInfo)
233 // OK. Let's see what the cost policy decides. If it must generate a dummy first event, we use that.
234 // Otherwise, the current event goes to the ignored list.
235 // The dummy first is considered to exist at the beginning of the billing period
237 val dummyFirst = chargingBehavior.constructDummyFirstEventFor(currentResourceEvent, billingMonthInfo.monthStartMillis)
239 clog.debug("Dummy first companion %s", rcDebugInfo(dummyFirst))
241 // proceed with charging???
242 (true, Some(dummyFirst))
244 clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
245 userStateWorker.updateIgnored(currentResourceEvent)
249 (true, previousResourceEventOpt0)
253 val defaultInitialAmount = chargingBehavior.getResourceInstanceInitialAmount
254 val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
255 val oldCredits = _workingUserState.totalCredits
257 // A. Compute new resource instance accumulating amount
258 val newAccumulatingAmount = chargingBehavior.computeNewAccumulatingAmount(oldAmount, theValue, theDetails)
260 clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAccumulatingAmount, oldCredits)
262 // B. Compute new wallet entries
263 clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
264 val alltimeAgreements = _workingUserState.agreementHistory.agreementsByTimeslot
266 // clog.debug("Computing full chargeslots")
267 val (referenceTimeslot, fullChargeslots) = timeslotComputations.computeFullChargeslots(
268 previousResourceEventOpt1,
269 currentResourceEvent,
272 newAccumulatingAmount,
279 // We have the chargeslots, let's associate them with the current event
280 if(fullChargeslots.length == 0) {
281 // At least one chargeslot is required.
282 throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
284 clog.debugSeq("fullChargeslots", fullChargeslots, 0)
286 // C. Compute new credit amount (based on the charge slots)
287 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
288 val newCredits = oldCredits - newCreditsDiff
290 val newWalletEntry = NewWalletEntry(
291 userStateWorker.userID,
295 TimeHelpers.nowMillis(),
297 billingMonthInfo.year,
298 billingMonthInfo.month,
299 if(havePreviousResourceEvent)
300 List(currentResourceEvent, previousResourceEventOpt1.get)
302 List(currentResourceEvent),
305 currentResourceEvent.isSynthetic
307 clog.debug("%s = %s", shortClassNameOf(newWalletEntry), newWalletEntry)
309 walletEntryRecorder.apply(newWalletEntry)
311 _workingUserState = _workingUserState.copy(
312 totalCredits = newCredits,
313 stateChangeCounter = _workingUserState.stateChangeCounter + 1
318 // After processing, all events billable or not update the previous state
319 userStateWorker.updatePrevious(currentResourceEvent)
321 _workingUserState = _workingUserState.copy(
322 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
325 // We do not have a resource (and thus, no cost policy)
327 // Now, this is a matter of politics: what do we do if no policy was found?
328 clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
329 } // dslResourceOpt match
331 clog.end(currentResourceEventDebugInfo)
336 def processResourceEvents(
337 resourceEvents: Traversable[ResourceEventModel],
338 startingUserState: UserState,
339 userStateWorker: UserStateWorker,
340 stateChangeReason: UserStateChangeReason,
341 billingMonthInfo: BillingMonthInfo,
342 walletEntryRecorder: NewWalletEntry ⇒ Unit,
343 clogOpt: Option[ContextualLogger] = None
346 var _workingUserState = startingUserState
348 for(currentResourceEvent ← resourceEvents) {
350 _workingUserState = processResourceEvent(
353 currentResourceEvent,
364 def doFullMonthBilling(
365 userStateBootstrap: UserStateBootstrap,
366 billingMonthInfo: BillingMonthInfo,
367 defaultResourceTypesMap: Map[String, ResourceType],
368 calculationReason: UserStateChangeReason,
369 storeFunc: UserState ⇒ UserState,
370 clogOpt: Option[ContextualLogger] = None
375 billingMonthInfo.monthStopMillis,
377 defaultResourceTypesMap,
384 def doMonthBillingUpTo(
386 * Which month to bill.
388 billingMonthInfo: BillingMonthInfo,
390 * Bill from start of month up to (and including) this time.
392 billingEndTimeMillis: Long,
393 userStateBootstrap: UserStateBootstrap,
394 defaultResourceTypesMap: Map[String, ResourceType],
395 calculationReason: UserStateChangeReason,
396 storeFunc: UserState ⇒ UserState,
397 clogOpt: Option[ContextualLogger] = None
400 val isFullMonthBilling = billingEndTimeMillis == billingMonthInfo.monthStopMillis
401 val userID = userStateBootstrap.userID
403 val clog = ContextualLogger.fromOther(
406 "doMonthBillingUpTo(%s)", new MutableDateCalc(billingEndTimeMillis).toYYYYMMDDHHMMSSSSS)
409 clog.debug("%s", calculationReason)
411 val clogSome = Some(clog)
413 val previousBillingMonthInfo = billingMonthInfo.previousMonth
414 val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
416 previousBillingMonthInfo,
417 defaultResourceTypesMap,
423 clog.debug("previousBillingMonthUserState(%s) = %s".format(
424 previousBillingMonthInfo.toShortDebugString,
425 previousBillingMonthUserState.toJsonString))
427 val startingUserState = previousBillingMonthUserState
429 // Keep the working (current) user state. This will get updated as we proceed with billing for the month
430 // specified in the parameters.
431 // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
432 var _workingUserState = startingUserState.newWithChangeReason(calculationReason)
434 val userStateWorker = UserStateWorker.fromUserState(_workingUserState, defaultResourceTypesMap)
436 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
438 // First, find and process the actual resource events from DB
439 val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
440 val walletEntryRecorder = (nwe: NewWalletEntry) ⇒ {
441 newWalletEntries.append(nwe)
444 var _rcEventsCounter = 0
445 resourceEventStore.foreachResourceEventOccurredInPeriod(
447 billingMonthInfo.monthStartMillis, // from start of month
448 billingEndTimeMillis // to requested time
449 ) { currentResourceEvent ⇒
451 clog.debug("Processing %s".format(currentResourceEvent))
453 _workingUserState = processResourceEvent(
456 currentResourceEvent,
463 _rcEventsCounter += 1
466 clog.debug("Found %s resource events for month %s".format(_rcEventsCounter, billingMonthInfo.toShortDebugString))
468 if(isFullMonthBilling) {
469 // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
470 // ... in order to generate an implicit ON later (during the next billing cycle).
471 val (specialEvents, theirImplicitEnds) = userStateWorker.
472 findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthInfo.monthStopMillis)
474 if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
476 clog.debug("Process implicitly issued events")
477 clog.debugSeq("specialEvents", specialEvents, 0)
478 clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
481 // Now, the previous and implicitly started must be our base for the following computation, so we create an
482 // appropriate worker
483 val specialUserStateWorker = UserStateWorker(
484 userStateWorker.userID,
485 LatestResourceEventsWorker.fromList(specialEvents),
486 ImplicitlyIssuedResourceEventsWorker.Empty,
487 IgnoredFirstResourceEventsWorker.Empty,
488 userStateWorker.resourceTypesMap
491 _workingUserState = processResourceEvents(
494 specialUserStateWorker,
502 val lastUpdateTime = TimeHelpers.nowMillis()
504 _workingUserState = _workingUserState.copy(
505 isFullBillingMonthState = isFullMonthBilling,
507 theFullBillingMonth = if(isFullMonthBilling)
508 Some(billingMonthInfo)
510 _workingUserState.theFullBillingMonth,
512 implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
514 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
516 stateChangeCounter = _workingUserState.stateChangeCounter + 1,
518 parentUserStateIDInStore = startingUserState.idInStore,
520 newWalletEntries = newWalletEntries.toList