import scala.collection.mutable
-import gr.grnet.aquarium.store.ResourceEventStore
import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
import gr.grnet.aquarium.logic.accounting.Accounting
-import gr.grnet.aquarium.util.date.{TimeHelpers, DateCalculator}
-import gr.grnet.aquarium.logic.accounting.dsl.{DSLResourcesMap, DSLCostPolicy, DSLPolicy, DSLAgreement}
+import gr.grnet.aquarium.util.date.DateCalculator
+import gr.grnet.aquarium.logic.accounting.dsl.{DSLResourcesMap, DSLCostPolicy, DSLPolicy}
import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent}
-
-sealed abstract class CalculationType(_name: String) {
- def name = _name
-}
-
-/**
- * Normal calculations that are part of the bill generation procedure
- */
-case object PeriodicCalculation extends CalculationType("periodic")
-
-/**
- * Adhoc calculations, e.g. when computing the state in realtime.
- */
-case object AdhocCalculation extends CalculationType("adhoc")
-
-trait UserPolicyFinder {
- def findUserPolicyAt(userId: String, whenMillis: Long): DSLPolicy
-}
-
-trait FullStateFinder {
- def findFullState(userId: String, whenMillis: Long): Any
-}
-
-trait UserStateCache {
- def findUserStateAtEndOfPeriod(userId: String, year: Int, month: Int): Maybe[UserState]
-
- /**
- * Find the most up-to-date user state for the particular billing period.
- */
- def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Maybe[UserState]
-}
-
-/**
- * Use this to keep track of implicit OFFs at the end of the billing period.
- *
- * The use case is this: A VM may have been started (ON state) before the end of the billing period
- * and ended (OFF state) after the beginning of the next billing period. In order to bill this, we must assume
- * an implicit OFF even right at the end of the billing period and an implicit ON event with the beginning of the
- * next billing period.
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- *
- * @param onEvents The `ON` events that need to be implicitly terminated.
- */
-case class ImplicitOffEvents(onEvents: List[ResourceEvent])
-
-case class OutOfSyncWalletEntries(entries: List[WalletEntry])
-
-/**
- * Full user state at the end of a billing month.
- *
- * @param userState
- * @param implicitOffs
- */
-case class EndOfBillingState(userState: UserState, implicitOffs: ImplicitOffEvents, outOfSyncWalletEntries: OutOfSyncWalletEntries)
+import gr.grnet.aquarium.logic.events.ResourceEvent
+import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, ResourceEventStore}
/**
*
0L,
false,
null,
+ Nil, Nil,Nil,
0L,
ActiveSuspendedSnapshot(false, now),
CreditSnapshot(0, now),
0L,
false,
null,
+ Nil, Nil,Nil,
0L,
ActiveSuspendedSnapshot(false, now),
CreditSnapshot(0, now),
)
}
- /**
- * Get the user state as computed up to (and not including) the start of the new billing period.
- *
- * Always compute, taking into account any "out of sync" resource events
- */
- def computeUserStateAtEndOfBillingPeriod(billingYear: Int,
- billingMonth: Int,
- knownUserState: UserState,
- accounting: Accounting): Maybe[EndOfBillingState] = {
+ def findUserStateAtEndOfBillingMonth(userId: String,
+ yearOfBillingMonth: Int,
+ billingMonth: Int,
+ userStateStore: UserStateStore,
+ resourceEventStore: ResourceEventStore,
+ policyStore: PolicyStore,
+ userCreationMillis: Long,
+ currentUserState: UserState,
+ zeroUserState: UserState,
+ defaultPolicy: DSLPolicy,
+ defaultResourcesMap: DSLResourcesMap,
+ accounting: Accounting): Maybe[UserState] = {
+
+ def D(fmt: String, args: Any*) = {
+ logger.debug("[%s, %s-%02d] %s".format(userId, yearOfBillingMonth, billingMonth, fmt.format(args:_*)))
+ }
+
+ def E(fmt: String, args: Any*) = {
+ logger.error("[%s, %s-%02d] %s".format(userId, yearOfBillingMonth, billingMonth, fmt.format(args:_*)))
+ }
- val billingDate = new DateCalculator(billingYear, billingMonth, 1)
- val billingDateMillis = billingDate.toMillis
+ def W(fmt: String, args: Any*) = {
+ logger.error("[%s, %s-%02d] %s".format(userId, yearOfBillingMonth, billingMonth, fmt.format(args:_*)))
+ }
-// if(billingDateMillis < knownUserState.startDateMillis) {
-// val userId = knownUserState.userId
-// val agreementName = knownUserState.agreement match {
-// case null ⇒ "default"
-// case agreement ⇒ agreement.data
-// }
-// createFirstUserState(userId, agreementName)
-// } else {
- // We really need to compute the user state here
+ def doCompute: Maybe[UserState] = {
+ D("Computing full month billing")
+ doFullMonthlyBilling(
+ userId,
+ yearOfBillingMonth,
+ billingMonth,
+ userStateStore,
+ resourceEventStore,
+ policyStore,
+ userCreationMillis,
+ currentUserState,
+ zeroUserState,
+ defaultPolicy,
+ defaultResourcesMap,
+ accounting)
+ }
- // get all events that
- // FIXME: Implement
- Just(EndOfBillingState(knownUserState, ImplicitOffEvents(Nil), OutOfSyncWalletEntries(Nil)))
+ val billingMonthStartDateCalc = new DateCalculator(yearOfBillingMonth, billingMonth)
+ val billingMonthStartMillis = billingMonthStartDateCalc.toMillis
+ val billingMonthStopMillis = billingMonthStartDateCalc.goEndOfThisMonth.toMillis
-// }
+ if(billingMonthStartMillis > userCreationMillis) {
+ // If the user did not exist for this billing month, piece of cake
+ D("User did not exists before %s. Returning %s", new DateCalculator(userCreationMillis), zeroUserState)
+ Just(zeroUserState)
+ } else {
+ resourceEventStore.countOutOfSyncEventsForBillingPeriod(userId, billingMonthStartMillis, billingMonthStopMillis) match {
+ case Just(outOfSyncEventCount) ⇒
+ // Have out of sync, so must recompute
+ D("Found %s out of sync events, will have to (re)compute user state", outOfSyncEventCount)
+ doCompute
+ case NoVal ⇒
+ // No out of sync events, ask DB cache
+ userStateStore.findLatestUserStateForEndOfBillingMonth(userId, yearOfBillingMonth, billingMonth) match {
+ case just @ Just(userState) ⇒
+ // Found from cache
+ D("Found from cache: %s", userState)
+ just
+ case NoVal ⇒
+ // otherwise compute
+ D("No user state found from cache, will have to (re)compute")
+ doCompute
+ case failed @ Failed(_, _) ⇒
+ W("Failure while quering cache for user state: %s", failed)
+ failed
+ }
+ case failed @ Failed(_, _) ⇒
+ W("Failure while querying for out of sync events: %s", failed)
+ failed
+ }
+ }
}
-
- def findBillingStateAtEndOfBillingPeriod(yearOfBillingMonth: Int,
- billingMonth: Int,
- userId: String,
- userStateCache: UserStateCache,
- accounting: Accounting): Maybe[EndOfBillingState] = {
- userStateCache.findLatestUserStateForEndOfBillingMonth(userId, yearOfBillingMonth, billingMonth) match {
- case Just(userState) ⇒
+
+ def doFullMonthlyBilling(userId: String,
+ yearOfBillingMonth: Int,
+ billingMonth: Int,
+ userStateStore: UserStateStore,
+ resourceEventStore: ResourceEventStore,
+ policyStore: PolicyStore,
+ userCreationMillis: Long,
+ currentUserState: UserState,
+ zeroUserState: UserState,
+ defaultPolicy: DSLPolicy,
+ defaultResourcesMap: DSLResourcesMap,
+ accounting: Accounting): Maybe[UserState] = Maybe {
+ val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
+ userId,
+ yearOfBillingMonth,
+ billingMonth,
+ userStateStore,
+ resourceEventStore,
+ policyStore,
+ userCreationMillis,
+ currentUserState,
+ zeroUserState,
+ defaultPolicy,
+ defaultResourcesMap,
+ accounting
+ )
+
+ previousBillingMonthUserStateM match {
case NoVal ⇒
- case failed @ Failed(e, m) ⇒
+ NoVal // not really...
+ case failed @ Failed(_, _) ⇒
+ failed
+ case Just(startingUserState) ⇒
+ // This is the real deal
+
+ val billingMonthStartDateCalc = new DateCalculator(yearOfBillingMonth, billingMonth)
+ val billingMonthEndDateCalc = billingMonthStartDateCalc.copy.goEndOfThisMonth
+ val billingMonthStartMillis = billingMonthStartDateCalc.toMillis
+ val billingMonthEndMillis = billingMonthEndDateCalc.toMillis
+
+ // Keep the working (current) user state. This will get updated as we proceed billing within the month
+ var _workingUserState = startingUserState
+
+ val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
+ userId,
+ billingMonthStartMillis,
+ billingMonthEndMillis)
}
- Just(EndOfBillingState(createFirstUserState(userId), ImplicitOffEvents(Nil), OutOfSyncWalletEntries(Nil)))
+ null
}
+
/**
* Find the previous resource event, if needed by the event's cost policy,
* in order to use it for any credit calculations.
newRCEvent: ResourceEvent): Unit = {
previousRCEventsMap(newRCEvent.fullResourceInfo) = newRCEvent
}
-
- /**
- * Do a full month billing.
- *
- * Takes into account "out of sync events".
- *
- */
- def computeFullMonthlyBilling(yearOfBillingMonth: Int,
- billingMonth: Int,
- userId: String,
- policyFinder: UserPolicyFinder,
- fullStateFinder: FullStateFinder,
- userStateCache: UserStateCache,
- rcEventStore: ResourceEventStore,
- currentUserState: UserState,
- otherStuff: Traversable[Any],
- defaultPolicy: DSLPolicy, // Policy.policy
- defaultResourcesMap: DSLResourcesMap,
- accounting: Accounting): Maybe[EndOfBillingState] = Maybe {
-
- val billingMonthStartDate = new DateCalculator(yearOfBillingMonth, billingMonth, 1)
- val billingMonthStopDate = billingMonthStartDate.copy.goEndOfThisMonth
-
- logger.debug("billingMonthStartDate = %s".format(billingMonthStartDate))
- logger.debug("billingMonthStopDate = %s".format(billingMonthStopDate))
-
- val prevBillingMonthStartDate = billingMonthStartDate.copy.goPreviousMonth
- val yearOfPrevBillingMonth = prevBillingMonthStartDate.getYear
- val prevBillingMonth = prevBillingMonthStartDate.getMonthOfYear
-
- // Check if this value is already cached and valid, otherwise compute the value
- // TODO : cache it in case of new computation
- val cachedStartUserStateM = userStateCache.findLatestUserStateForEndOfBillingMonth(
- userId,
- yearOfPrevBillingMonth,
- prevBillingMonth)
-
- val (previousStartUserState, newStartUserState) = cachedStartUserStateM match {
- case Just(cachedStartUserState) ⇒
- // So, we do have a cached user state but must check if this is still valid
- logger.debug("Found cachedStartUserState = %s".format(cachedStartUserState))
-
- // Check how many resource events were used to produce this user state
- val cachedHowmanyRCEvents = cachedStartUserState.resourceEventsCounter
-
- // Ask resource event store to see if we had any "out of sync" events for the particular (== previous)
- // billing period.
- val prevHowmanyOutOfSyncRCEvents = rcEventStore.countOutOfSyncEventsForBillingMonth(
- userId,
- yearOfPrevBillingMonth,
- prevBillingMonth)
- logger.debug("prevHowmanyOutOfSyncRCEvents = %s".format(prevHowmanyOutOfSyncRCEvents))
-
- val recomputedStartUserState = if(prevHowmanyOutOfSyncRCEvents == 0) {
- logger.debug("Not necessary to recompute start user state, using cachedStartUserState")
- // This is good, there were no "out of sync" resource events, so we can use the cached value
- cachedStartUserState
- } else {
- // Oops, there are "out of sync" resource event. Must compute (potentially recursively)
- logger.debug("Recompute start user state...")
- val computedUserStateAtStartOfBillingPeriod = computeUserStateAtEndOfBillingPeriod(
- yearOfPrevBillingMonth,
- prevBillingMonth,
- cachedStartUserState,
- accounting)
- logger.debug("computedUserStateAtStartOfiingPeriodllB = %s".format(computedUserStateAtStartOfBillingPeriod))
- val recomputedStartUserState = computedUserStateAtStartOfBillingPeriod.asInstanceOf[Just[EndOfBillingState]].get.userState // FIXME
- logger.debug("recomputedStartUserState = %s".format(recomputedStartUserState))
- recomputedStartUserState
- }
-
- (cachedStartUserState, recomputedStartUserState)
- case NoVal ⇒
- // We do not even have a cached value, so compute one!
- logger.debug("Do not have a cachedStartUserState, computing one...")
- val computedUserStateAtStartOfBillingPeriod = computeUserStateAtEndOfBillingPeriod(
- yearOfPrevBillingMonth,
- prevBillingMonth,
- currentUserState,
- accounting)
- logger.debug("computedUserStateAtStartOfBillingPeriod = %s".format(computedUserStateAtStartOfBillingPeriod))
- val recomputedStartUserState = computedUserStateAtStartOfBillingPeriod.asInstanceOf[Just[EndOfBillingState]].get.userState // FIXME
- logger.debug("recomputedStartUserState = %s".format(recomputedStartUserState))
-
- (recomputedStartUserState, recomputedStartUserState)
- case Failed(e, m) ⇒
- logger.error("[Could not find latest user state for billing month %s-%s] %s".format(yearOfPrevBillingMonth, prevBillingMonth, m), e)
- throw new Exception(m, e)
- }
-
- // OK. Now that we have a user state to start with (= start of billing period reference point),
- // let us deal with the events themselves.
- val billingStartMillis = billingMonthStartDate.toMillis
- val billingStopMillis = billingMonthStopDate.toMillis
- val allBillingPeriodRelevantRCEvents = rcEventStore.findAllRelevantResourceEventsForBillingPeriod(userId, billingStartMillis, billingStopMillis)
- logger.debug("allBillingPeriodRelevantRCEvents [%s] = %s".format(allBillingPeriodRelevantRCEvents.size, allBillingPeriodRelevantRCEvents))
-
- type FullResourceType = ResourceEvent.FullResourceType
- // For each type and instance of resource, we keep the previously met resource event.
- val previousRCEventsMap = mutable.Map[FullResourceType, ResourceEvent]()
- // Since we may already have some implicit events from the beginning of the billing period, we put
- // them to the map.
- // TODO:
- val impliedRCEventsMap = mutable.Map[FullResourceType, ResourceEvent]() // those which do not exists but are
- // implied in order to do billing calculations (e.g. the "off" vmtime resource event)
-
- // Our temporary state holder.
- var _workingUserState = newStartUserState
- val nowMillis = TimeHelpers.nowMillis
- var _counter = 0
-
- for(currentResourceEvent <- allBillingPeriodRelevantRCEvents) {
- _counter = _counter + 1
- val resource = currentResourceEvent.resource
- val instanceId = currentResourceEvent.instanceId
-
- logger.debug("%02d. Processing %s".format(_counter, currentResourceEvent.toDebugString(defaultResourcesMap, true)))
- // ResourCe events Debug
- // = = =
- def RCD(fmt: String, args: Any*) = logger.debug(" ⇒ " + fmt.format(args:_*))
-
- RCD("previousRCEventsMap: ")
- for {
- (k, v) <- previousRCEventsMap
- } {
- RCD(" %s ⇒ %s".format(k, v.toDebugString(defaultResourcesMap, true)))
- }
-
- // We need to do these kinds of calculations:
- // 1. Credit state calculations
- // 2. Resource state calculations
-
- // How credits are computed:
- // - "onoff" events (think "vmtime"):
- // - need to be considered in on/off pairs
- // - just use the time difference of this event to the previous one for the credit computation
- // - "discrete" events (think "bandwidth"):
- // - just use their value, which is a difference already for the credit computation
- // - "continuous" events (think "bandwidth"):
- // - need the previous absolute value
- // - need the time difference of this event to the previous one
- // - use both the above (previous absolute value, time difference) for the credit computation
- //
- // BUT ALL THE ABOVE SHOULD NOT BE CONSIDERED HERE; RATHER THEY ARE POLYMORPHIC BEHAVIOURS
-
- // What we need to do is:
- // A. Update user state with new resource instance amount
- // B. Update user state with new credit
- // C. Update ??? state with wallet entries
-
- // The DSLCostPolicy for the resource does not change, so it is safe to use the default DSLPolicy to obtain it.
- val costPolicyOpt = currentResourceEvent.findCostPolicy(defaultResourcesMap)
- costPolicyOpt match {
- case Some(costPolicy) ⇒
- RCD("Found costPolicy = %s".format(costPolicy))
-
- // If this is an event for which no action is required, then OK, proceed with the next one
- // Basically, we do nothing for ON events but we treat everything polymorphically here
- costPolicy.isBillableEventBasedOnValue(currentResourceEvent.value) match {
- case true ⇒
- ///////////////////////////////////////
- // A. Update user state with new resource instance amount
- // TODO: Check if we are at beginning of billing period, so as to use
- // costPolicy.computeResourceInstanceAmountForNewBillingPeriod
- val DefaultResourceInstanceAmount = costPolicy.getResourceInstanceInitialAmount
- RCD("DefaultResourceInstanceAmount = %s".format(DefaultResourceInstanceAmount))
-
- val previousAmount = currentUserState.getResourceInstanceAmount(resource, instanceId, DefaultResourceInstanceAmount)
- RCD("previousAmount = %s".format(previousAmount))
- val newAmount = costPolicy.computeNewResourceInstanceAmount(previousAmount, currentResourceEvent.value)
- RCD("newAmount = %s".format(newAmount))
-
- _workingUserState = _workingUserState.copyForResourcesSnapshotUpdate(resource, instanceId, newAmount, nowMillis)
- // A. Update user state with new resource instance amount
- ///////////////////////////////////////
-
-
- ///////////////////////////////////////
- // B. Update user state with new credit
- val previousRCEventM = findPreviousRCEventOf(currentResourceEvent, costPolicy, previousRCEventsMap)
- _workingUserState.findResourceInstanceSnapshot(resource, instanceId)
- // B. Update user state with new credit
- ///////////////////////////////////////
-
-
- ///////////////////////////////////////
- // C. Update ??? state with wallet entries
-
- // C. Update ??? state with wallet entries
- ///////////////////////////////////////
-
- case false ⇒ // costPolicy.isBillableEventBasedOnValue(currentResourceEvent.value)
- RCD("Ignoring not billabe (%s) %s".format(
- currentResourceEvent.beautifyValue(defaultResourcesMap),
- currentResourceEvent.toDebugString(defaultResourcesMap, true)))
- }
-
- case None ⇒
- () // ERROR
- }
-
-
- updatePreviousRCEventWith(previousRCEventsMap, currentResourceEvent)
- } // for(newResourceEvent <- allBillingPeriodRelevantRCEvents)
-
-
- null
- }
-
-
- /**
- * Runs the billing algorithm on the specified period.
- * By default, a billing period is monthly.
- * The start of the billing period is midnight of the first day of the month we compute the bill for.
- *
- */
- def doPartialMonthlyBilling(startBillingYear: Int,
- startBillingMonth: Int,
- stopBillingMillis: Long,
- userId: String,
- policyFinder: UserPolicyFinder,
- fullStateFinder: FullStateFinder,
- userStateFinder: UserStateCache,
- rcEventStore: ResourceEventStore,
- currentUserState: UserState,
- otherStuff: Traversable[Any],
- accounting: Accounting): Maybe[UserState] = Maybe {
-
-
- null.asInstanceOf[UserState]
- }
}
object DefaultUserStateComputations extends UserStateComputations
\ No newline at end of file