2 * Copyright 2011 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.user
39 import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
40 import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, ResourceEventStore}
41 import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
42 import gr.grnet.aquarium.logic.accounting.Accounting
43 import gr.grnet.aquarium.logic.accounting.algorithm.SimpleCostPolicyAlgorithmCompiler
44 import gr.grnet.aquarium.logic.events.{NewWalletEntry, ResourceEvent}
45 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
46 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLCostPolicy, DSLResourcesMap, DSLPolicy}
50 * @author Christos KK Loverdos <loverdos@gmail.com>
52 class UserStateComputations extends Loggable {
53 def createFirstUserState(userId: String,
54 millis: Long = TimeHelpers.nowMillis,
55 agreementName: String = DSLAgreement.DefaultAgreementName) = {
63 ImplicitlyIssuedResourceEventsSnapshot(List(), now),
65 LatestResourceEventsSnapshot(List(), now),
67 ActiveStateSnapshot(false, now),
68 CreditSnapshot(0, now),
69 AgreementSnapshot(Agreement(agreementName, now) :: Nil, now),
70 RolesSnapshot(List(), now),
71 OwnedResourcesSnapshot(List(), now)
75 def createFirstUserState(userId: String, agreementName: String, resourcesMap: DSLResourcesMap) = {
83 ImplicitlyIssuedResourceEventsSnapshot(List(), now),
85 LatestResourceEventsSnapshot(List(), now),
87 ActiveStateSnapshot(false, now),
88 CreditSnapshot(0, now),
89 AgreementSnapshot(Agreement(agreementName, now) :: Nil, now),
90 RolesSnapshot(List(), now),
91 OwnedResourcesSnapshot(List(), now)
95 def findUserStateAtEndOfBillingMonth(userId: String,
96 billingMonthInfo: BillingMonthInfo,
97 userStateStore: UserStateStore,
98 resourceEventStore: ResourceEventStore,
99 policyStore: PolicyStore,
100 userCreationMillis: Long,
101 currentUserState: UserState,
102 zeroUserState: UserState,
103 defaultPolicy: DSLPolicy,
104 defaultResourcesMap: DSLResourcesMap,
105 accounting: Accounting,
106 contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = {
108 val clog = ContextualLogger.fromOther(
111 "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
114 def doCompute: Maybe[UserState] = {
115 clog.debug("Computing full month billing")
116 doFullMonthlyBilling(
131 val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
132 val billingMonthStartMillis = billingMonthInfo.startMillis
133 val billingMonthStopMillis = billingMonthInfo.stopMillis
135 if(billingMonthStopMillis < userCreationMillis) {
136 // If the user did not exist for this billing month, piece of cake
137 clog.debug("User did not exist before %s", userCreationDateCalc)
138 clog.debug("Returning ZERO state %s".format(zeroUserState))
139 clog.endWith(Just(zeroUserState))
141 // Ask DB cache for the latest known user state for this billing period
142 val latestUserStateM = userStateStore.findLatestUserStateForEndOfBillingMonth(
144 billingMonthInfo.year,
145 billingMonthInfo.month)
147 latestUserStateM match {
149 // Not found, must compute
150 clog.debug("No user state found from cache, will have to (re)compute")
151 clog.endWith(doCompute)
153 case failed @ Failed(_, _) ⇒
154 clog.warn("Failure while quering cache for user state: %s", failed)
157 case Just(latestUserState) ⇒
158 // Found a "latest" user state but need to see if it is indeed the true and one latest.
159 // For this reason, we must count the events again.
160 val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
161 val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
163 billingMonthStartMillis,
164 billingMonthStopMillis)
166 actualOOSEventsCounterM match {
168 val errMsg = "No counter computed for out of sync events. Should at least be zero."
170 clog.endWith(Failed(new Exception(errMsg)))
172 case failed @ Failed(_, _) ⇒
173 clog.warn("Failure while querying for out of sync events: %s", failed)
176 case Just(actualOOSEventsCounter) ⇒
177 val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
183 // We had more, so must recompute
186 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
187 clog.endWith(doCompute)
191 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
193 clog.endWith(Failed(new Exception(errMsg)))
200 def doFullMonthlyBilling(userId: String,
201 billingMonthInfo: BillingMonthInfo,
202 userStateStore: UserStateStore,
203 resourceEventStore: ResourceEventStore,
204 policyStore: PolicyStore,
205 userCreationMillis: Long,
206 currentUserState: UserState,
207 zeroUserState: UserState,
208 defaultPolicy: DSLPolicy,
209 defaultResourcesMap: DSLResourcesMap,
210 accounting: Accounting,
211 contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = Maybe {
213 def rcDebugInfo(rcEvent: ResourceEvent) = {
214 rcEvent.toDebugString(defaultResourcesMap, false)
217 val clog = ContextualLogger.fromOther(
220 "doFullMonthlyBilling(%s)", billingMonthInfo)
223 val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
225 billingMonthInfo.previousMonth,
238 previousBillingMonthUserStateM match {
240 null // not really... (must throw an exception here probably...)
241 case failed @ Failed(e, _) ⇒
243 case Just(startingUserState) ⇒
244 // This is the real deal
246 // This is a collection of all the latest resource events.
247 // We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
249 // Will be updated on processing the next resource event.
250 val previousResourceEvents = startingUserState.latestResourceEventsSnapshot.toMutableWorker
251 clog.debug("previousResourceEvents = %s", previousResourceEvents)
253 val billingMonthStartMillis = billingMonthInfo.startMillis
254 val billingMonthEndMillis = billingMonthInfo.stopMillis
256 // Keep the working (current) user state. This will get updated as we proceed with billing for the month
257 // specified in the parameters.
258 var _workingUserState = startingUserState
260 // Prepare the implicitly terminated resource events from previous billing period
261 val implicitlyTerminatedResourceEvents = _workingUserState.implicitlyTerminatedSnapshot.toMutableWorker
262 if(implicitlyTerminatedResourceEvents.size > 0) {
263 clog.debug("%s implicitlyTerminatedResourceEvents", implicitlyTerminatedResourceEvents.size)
265 implicitlyTerminatedResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
269 // Keep the resource events from this period that were first (and unused) of their kind
270 val ignoredFirstResourceEvents = IgnoredFirstResourceEventsWorker.Empty
273 * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource
274 * events and b) the explicit previous resource events. If the event is found, it is removed from the
277 * If the event is not found, then this must be for a new resource instance.
278 * (and probably then some `zero` resource event must be implied as the previous one)
284 def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = {
285 // implicitly terminated events are checked first
286 implicitlyTerminatedResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
287 case just @ Just(_) ⇒
290 // explicit previous resource events are checked second
291 previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
292 case just @ Just(_) ⇒
302 // Find the actual resource events from DB
303 val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
305 billingMonthStartMillis,
306 billingMonthEndMillis)
307 var _eventCounter = 0
309 clog.debug("resourceEventStore = %s".format(resourceEventStore))
310 if(allResourceEventsForMonth.size > 0) {
311 clog.debug("Found %s resource events, starting processing...", allResourceEventsForMonth.size)
313 clog.debug("Not found any resource events")
317 currentResourceEvent <- allResourceEventsForMonth
319 _eventCounter = _eventCounter + 1
320 val theResource = currentResourceEvent.safeResource
321 val theInstanceId = currentResourceEvent.safeInstanceId
322 val theValue = currentResourceEvent.value
326 clog.debug("Processing %s", currentResourceEvent)
327 clog.debug("+========= %s", rcDebugInfo(currentResourceEvent))
331 if(previousResourceEvents.size > 0) {
332 clog.debug("%s previousResourceEvents", previousResourceEvents.size)
334 previousResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
337 if(implicitlyTerminatedResourceEvents.size > 0) {
338 clog.debug("%s implicitlyTerminatedResourceEvents", implicitlyTerminatedResourceEvents.size)
340 implicitlyTerminatedResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
343 if(ignoredFirstResourceEvents.size > 0) {
344 clog.debug("%s ignoredFirstResourceEvents", ignoredFirstResourceEvents.size)
346 ignoredFirstResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
350 // Ignore the event if it is not billable (but still record it in the "previous" stuff).
351 // But to make this decision, first we need the resource definition (and its cost policy).
352 val resourceDefM = defaultResourcesMap.findResourceM(theResource)
354 // We have a resource (and thus a cost policy)
355 case Just(resourceDef) ⇒
356 val costPolicy = resourceDef.costPolicy
357 clog.debug("Cost policy: %s", costPolicy)
358 val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
360 // The resource event is not billable
362 clog.debug("Ignoring not billable event %s", rcDebugInfo(currentResourceEvent))
364 // The resource event is billable
366 // Find the previous event.
367 // This is (potentially) needed to calculate new credit amount and new resource instance amount
368 val previousResourceEventM = findAndRemovePreviousResourceEvent(theResource, theInstanceId)
369 clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
371 val havePreviousResourceEvent = previousResourceEventM.isJust
372 val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
373 if(needPreviousResourceEvent && !havePreviousResourceEvent) {
374 // This must be the first resource event of its kind, ever.
375 // TODO: We should normally check the DB to verify the claim (?)
376 clog.info("Ignoring first event of its kind %s", rcDebugInfo(currentResourceEvent))
377 ignoredFirstResourceEvents.updateResourceEvent(currentResourceEvent)
379 val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
380 val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
381 val oldCredits = _workingUserState.creditsSnapshot.creditAmount
383 // A. Compute new resource instance accumulating amount
384 val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
386 clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
388 // B. Compute new wallet entries
389 val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
391 val fullChargeslotsM = accounting.computeFullChargeslots(
392 previousResourceEventM,
393 currentResourceEvent,
400 SimpleCostPolicyAlgorithmCompiler,
405 // We have the chargeslots, let's associate them with the current event
406 fullChargeslotsM match {
407 case Just(fullChargeslots) ⇒
408 if(fullChargeslots.length == 0) {
409 // At least one chargeslot is required.
410 throw new Exception("No chargeslots computed")
412 clog.debug("chargeslots:")
414 for(fullChargeslot <- fullChargeslots) {
415 clog.debug("%s", fullChargeslot)
419 // C. Compute new credit amount (based on the charge slots)
420 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
421 val newCredits = oldCredits + newCreditsDiff
422 clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
424 val newWalletEntry = NewWalletEntry(
429 TimeHelpers.nowMillis,
430 billingMonthInfo.year,
431 billingMonthInfo.month,
432 currentResourceEvent,
433 previousResourceEventM.toOption,
438 clog.debug("New %s", newWalletEntry)
441 // At least one chargeslot is required.
442 throw new Exception("No chargeslots computed")
444 case failed @ Failed(e, m) ⇒
445 throw new Exception(m, e)
451 // After processing, all event, billable or not update the previous state
452 previousResourceEvents.updateResourceEvent(currentResourceEvent)
454 // We do not have a resource (and no cost policy)
456 // Now, this is a matter of politics: what do we do if no policy was found?
457 clog.error("No cost policy for %s", rcDebugInfo(currentResourceEvent))
459 // Could not retrieve resource (unlikely to happen)
460 case failed @ Failed(e, m) ⇒
461 clog.error("Error obtaining cost policy for %s", rcDebugInfo(currentResourceEvent))
466 clog.debug("-========= %s", rcDebugInfo(currentResourceEvent))
471 clog.endWith(_workingUserState)