/*
- * Copyright 2011 GRNET S.A. All rights reserved.
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
package gr.grnet.aquarium.user
-import scala.collection.mutable
-import gr.grnet.aquarium.store.ResourceEventStore
-import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
+import scala.collection.mutable
+import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
+import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
+import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap}
+import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
import gr.grnet.aquarium.logic.accounting.Accounting
-import gr.grnet.aquarium.util.date.{TimeHelpers, DateCalculator}
-import gr.grnet.aquarium.logic.accounting.dsl.{DSLResourcesMap, DSLCostPolicy, DSLPolicy, DSLAgreement}
-import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent}
-
-sealed abstract class CalculationType(_name: String) {
- def name = _name
-}
-
-/**
- * Normal calculations that are part of the bill generation procedure
- */
-case object PeriodicCalculation extends CalculationType("periodic")
-
-/**
- * Adhoc calculations, e.g. when computing the state in realtime.
- */
-case object AdhocCalculation extends CalculationType("adhoc")
-
-trait UserPolicyFinder {
- def findUserPolicyAt(userId: String, whenMillis: Long): DSLPolicy
-}
-
-trait FullStateFinder {
- def findFullState(userId: String, whenMillis: Long): Any
-}
-
-trait UserStateCache {
- def findUserStateAtEndOfPeriod(userId: String, year: Int, month: Int): Maybe[UserState]
-
- /**
- * Find the most up-to-date user state for the particular billing period.
- */
- def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Maybe[UserState]
-}
+import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
+import gr.grnet.aquarium.event.NewWalletEntry
+import gr.grnet.aquarium.event.resource.ResourceEventModel
+import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent}
+import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
/**
- * Use this to keep track of implicit OFFs at the end of the billing period.
- *
- * The use case is this: A VM may have been started (ON state) before the end of the billing period
- * and ended (OFF state) after the beginning of the next billing period. In order to bill this, we must assume
- * an implicit OFF even right at the end of the billing period and an implicit ON event with the beginning of the
- * next billing period.
*
* @author Christos KK Loverdos <loverdos@gmail.com>
- *
- * @param onEvents The `ON` events that need to be implicitly terminated.
*/
-case class ImplicitOffEvents(onEvents: List[ResourceEvent])
+class UserStateComputations extends Loggable {
+ def createInitialUserState(imEvent: IMEventModel, credits: Double, agreementName: String) = {
+ if(!imEvent.isCreateUser) {
+ throw new AquariumInternalError(
+ "Got '%s' instead of '%s'".format(imEvent.eventType, IMEventModel.EventTypeNames.create))
+ }
-case class OutOfSyncWalletEntries(entries: List[WalletEntry])
+ val userID = imEvent.userID
+ val userCreationMillis = imEvent.occurredMillis
+ val now = TimeHelpers.nowMillis()
-/**
- * Full user state at the end of a billing month.
- *
- * @param userState
- * @param implicitOffs
- */
-case class EndOfBillingState(userState: UserState, implicitOffs: ImplicitOffEvents, outOfSyncWalletEntries: OutOfSyncWalletEntries)
+ UserState(
+ true,
+ userID,
+ userCreationMillis,
+ 0L,
+ false,
+ null,
+ ImplicitlyIssuedResourceEventsSnapshot(List(), now),
+ Nil,
+ Nil,
+ LatestResourceEventsSnapshot(List(), now),
+ 0L,
+ 0L,
+ IMStateSnapshot(imEvent, now),
+ CreditSnapshot(credits, now),
+ AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
+ OwnedResourcesSnapshot(Nil, now),
+ Nil,
+ InitialUserStateSetup
+ )
+ }
+
+ def createInitialUserState(userID: String,
+ userCreationMillis: Long,
+ isActive: Boolean,
+ credits: Double,
+ roleNames: List[String] = List(),
+ agreementName: String = DSLAgreement.DefaultAgreementName) = {
+ val now = userCreationMillis
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-class UserStateComputations extends Loggable {
- def createFirstUserState(userId: String, agreementName: String = "default") = {
- val now = 0L
UserState(
- userId,
- now,
+ true,
+ userID,
+ userCreationMillis,
0L,
false,
null,
+ ImplicitlyIssuedResourceEventsSnapshot(List(), now),
+ Nil,
+ Nil,
+ LatestResourceEventsSnapshot(List(), now),
0L,
- ActiveSuspendedSnapshot(false, now),
- CreditSnapshot(0, now),
- AgreementSnapshot(Agreement(agreementName, now, -1) :: Nil, now),
- RolesSnapshot(List(), now),
- OwnedResourcesSnapshot(List(), now)
+ 0L,
+ IMStateSnapshot(
+ StdIMEvent(
+ "",
+ now, now, userID,
+ "",
+ isActive, roleNames.headOption.getOrElse("default"),
+ "1.0",
+ IMEventModel.EventTypeNames.create, Map()),
+ now
+ ),
+ CreditSnapshot(credits, now),
+ AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
+ OwnedResourcesSnapshot(Nil, now),
+ Nil,
+ InitialUserStateSetup
)
}
- def createFirstUserState(userId: String, agreementName: String, resourcesMap: DSLResourcesMap) = {
- val now = 0L
- UserState(
+ def createInitialUserStateFrom(us: UserState): UserState = {
+ createInitialUserState(
+ us.imStateSnapshot.imEvent,
+ us.creditsSnapshot.creditAmount,
+ us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last)
+ }
+
+ def findUserStateAtEndOfBillingMonth(userId: String,
+ billingMonthInfo: BillingMonthInfo,
+ storeProvider: StoreProvider,
+ currentUserState: UserState,
+ defaultResourcesMap: DSLResourcesMap,
+ accounting: Accounting,
+ algorithmCompiler: CostPolicyAlgorithmCompiler,
+ calculationReason: UserStateChangeReason,
+ clogOpt: Option[ContextualLogger] = None): UserState = {
+
+ val clog = ContextualLogger.fromOther(
+ clogOpt,
+ logger,
+ "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
+ clog.begin()
+
+ def doCompute: UserState = {
+ doFullMonthlyBilling(
+ userId,
+ billingMonthInfo,
+ storeProvider,
+ currentUserState,
+ defaultResourcesMap,
+ accounting,
+ algorithmCompiler,
+ calculationReason,
+ Some(clog))
+ }
+
+ val userStateStore = storeProvider.userStateStore
+ val resourceEventStore = storeProvider.resourceEventStore
+
+ val userCreationMillis = currentUserState.userCreationMillis
+ val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
+ val billingMonthStartMillis = billingMonthInfo.startMillis
+ val billingMonthStopMillis = billingMonthInfo.stopMillis
+
+ if(billingMonthStopMillis < userCreationMillis) {
+ // If the user did not exist for this billing month, piece of cake
+ clog.debug("User did not exist before %s", userCreationDateCalc)
+
+ // NOTE: Reason here will be: InitialUserStateSetup$
+ val initialUserState0 = createInitialUserStateFrom(currentUserState)
+ val initialUserState1 = userStateStore.insertUserState(initialUserState0)
+
+ clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
+ clog.end()
+
+ initialUserState1
+ } else {
+ // Ask DB cache for the latest known user state for this billing period
+ val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
userId,
- now,
- 0L,
- false,
- null,
- 0L,
- ActiveSuspendedSnapshot(false, now),
- CreditSnapshot(0, now),
- AgreementSnapshot(Agreement(agreementName, now, - 1) :: Nil, now),
- RolesSnapshot(List(), now),
- OwnedResourcesSnapshot(List(), now)
+ billingMonthInfo.year,
+ billingMonthInfo.month)
+
+ latestUserStateOpt match {
+ case None ⇒
+ // Not found, must compute
+ clog.debug("No user state found from cache, will have to (re)compute")
+ val result = doCompute
+ clog.end()
+ result
+
+ case Some(latestUserState) ⇒
+ // Found a "latest" user state but need to see if it is indeed the true and one latest.
+ // For this reason, we must count the events again.
+ val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
+ val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
+ userId,
+ billingMonthStartMillis,
+ billingMonthStopMillis)
+
+ val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
+ counterDiff match {
+ // ZERO, we are OK!
+ case 0 ⇒
+ // NOTE: Keep the caller's calculation reason
+ latestUserState.copyForChangeReason(calculationReason)
+
+ // We had more, so must recompute
+ case n if n > 0 ⇒
+ clog.debug(
+ "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
+ val result = doCompute
+ clog.end()
+ result
+
+ // We had less????
+ case n if n < 0 ⇒
+ val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
+ clog.warn(errMsg)
+ throw new AquariumException(errMsg)
+ }
+ }
+ }
+ }
+
+ //+ Utility methods
+ def rcDebugInfo(rcEvent: ResourceEventModel) = {
+ rcEvent.toDebugString(false)
+ }
+ //- Utility methods
+
+ def processResourceEvent(startingUserState: UserState,
+ userStateWorker: UserStateWorker,
+ currentResourceEvent: ResourceEventModel,
+ policyStore: PolicyStore,
+ stateChangeReason: UserStateChangeReason,
+ billingMonthInfo: BillingMonthInfo,
+ walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
+ algorithmCompiler: CostPolicyAlgorithmCompiler,
+ clogOpt: Option[ContextualLogger] = None): UserState = {
+
+ val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
+
+ var _workingUserState = startingUserState
+
+ val theResource = currentResourceEvent.safeResource
+ val theInstanceId = currentResourceEvent.safeInstanceId
+ val theValue = currentResourceEvent.value
+
+ val accounting = userStateWorker.accounting
+ val resourcesMap = userStateWorker.resourcesMap
+
+ val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
+ clog.begin(currentResourceEventDebugInfo)
+
+ userStateWorker.debugTheMaps(clog)(rcDebugInfo)
+
+ // Ignore the event if it is not billable (but still record it in the "previous" stuff).
+ // But to make this decision, first we need the resource definition (and its cost policy).
+ val dslResourceOpt = resourcesMap.findResource(theResource)
+ dslResourceOpt match {
+ // We have a resource (and thus a cost policy)
+ case Some(dslResource) ⇒
+ val costPolicy = dslResource.costPolicy
+ clog.debug("Cost policy %s for %s", costPolicy, dslResource)
+ val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
+ if(!isBillable) {
+ // The resource event is not billable
+ clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
+ } else {
+ // The resource event is billable
+ // Find the previous event.
+ // This is (potentially) needed to calculate new credit amount and new resource instance amount
+ val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
+ clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
+
+ val havePreviousResourceEvent = previousResourceEventOpt.isDefined
+ val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
+ if(needPreviousResourceEvent && !havePreviousResourceEvent) {
+ // This must be the first resource event of its kind, ever.
+ // TODO: We should normally check the DB to verify the claim (?)
+ clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
+ userStateWorker.updateIgnored(currentResourceEvent)
+ } else {
+ val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
+ val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
+ val oldCredits = _workingUserState.creditsSnapshot.creditAmount
+
+ // A. Compute new resource instance accumulating amount
+ val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
+
+ clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
+
+ // B. Compute new wallet entries
+ clog.debug("agreementsSnapshot = %s", _workingUserState.agreementsSnapshot)
+ val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
+
+ // clog.debug("Computing full chargeslots")
+ val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
+ previousResourceEventOpt,
+ currentResourceEvent,
+ oldCredits,
+ oldAmount,
+ newAmount,
+ dslResource,
+ resourcesMap,
+ alltimeAgreements,
+ algorithmCompiler,
+ policyStore,
+ Some(clog)
+ )
+
+ // We have the chargeslots, let's associate them with the current event
+ if(fullChargeslots.length == 0) {
+ // At least one chargeslot is required.
+ throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
+ }
+ clog.debugSeq("fullChargeslots", fullChargeslots, 0)
+
+ // C. Compute new credit amount (based on the charge slots)
+ val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
+ val newCredits = oldCredits - newCreditsDiff
+
+ if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
+ val newWalletEntry = NewWalletEntry(
+ userStateWorker.userId,
+ newCreditsDiff,
+ oldCredits,
+ newCredits,
+ TimeHelpers.nowMillis(),
+ referenceTimeslot,
+ billingMonthInfo.year,
+ billingMonthInfo.month,
+ if(havePreviousResourceEvent)
+ List(currentResourceEvent, previousResourceEventOpt.get)
+ else
+ List(currentResourceEvent),
+ fullChargeslots,
+ dslResource,
+ currentResourceEvent.isSynthetic
+ )
+ clog.debug("New %s", newWalletEntry)
+
+ walletEntriesBuffer += newWalletEntry
+ } else {
+ clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
+ }
+
+ _workingUserState = _workingUserState.copy(
+ creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
+ stateChangeCounter = _workingUserState.stateChangeCounter + 1,
+ totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
+ )
+ }
+ }
+
+ // After processing, all events billable or not update the previous state
+ userStateWorker.updatePrevious(currentResourceEvent)
+
+ _workingUserState = _workingUserState.copy(
+ latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
+ )
+
+ // We do not have a resource (and thus, no cost policy)
+ case None ⇒
+ // Now, this is a matter of politics: what do we do if no policy was found?
+ clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
+ } // dslResourceOpt match
+
+ clog.end(currentResourceEventDebugInfo)
+
+ _workingUserState
+ }
+
+ def processResourceEvents(resourceEvents: Traversable[ResourceEventModel],
+ startingUserState: UserState,
+ userStateWorker: UserStateWorker,
+ policyStore: PolicyStore,
+ stateChangeReason: UserStateChangeReason,
+ billingMonthInfo: BillingMonthInfo,
+ walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
+ algorithmCompiler: CostPolicyAlgorithmCompiler,
+ clogOpt: Option[ContextualLogger] = None): UserState = {
+
+ var _workingUserState = startingUserState
+
+ for(currentResourceEvent ← resourceEvents) {
+
+ _workingUserState = processResourceEvent(
+ _workingUserState,
+ userStateWorker,
+ currentResourceEvent,
+ policyStore,
+ stateChangeReason,
+ billingMonthInfo,
+ walletEntriesBuffer,
+ algorithmCompiler,
+ clogOpt
)
}
- /**
- * Get the user state as computed up to (and not including) the start of the new billing period.
- *
- * Always compute, taking into account any "out of sync" resource events
- */
- def computeUserStateAtEndOfBillingPeriod(billingYear: Int,
- billingMonth: Int,
- knownUserState: UserState,
- accounting: Accounting): Maybe[EndOfBillingState] = {
-
- val billingDate = new DateCalculator(billingYear, billingMonth, 1)
- val billingDateMillis = billingDate.toMillis
-
-// if(billingDateMillis < knownUserState.startDateMillis) {
-// val userId = knownUserState.userId
-// val agreementName = knownUserState.agreement match {
-// case null ⇒ "default"
-// case agreement ⇒ agreement.data
-// }
-// createFirstUserState(userId, agreementName)
-// } else {
- // We really need to compute the user state here
-
- // get all events that
- // FIXME: Implement
- Just(EndOfBillingState(knownUserState, ImplicitOffEvents(Nil), OutOfSyncWalletEntries(Nil)))
-
-// }
+ _workingUserState
}
-
- def findBillingStateAtEndOfBillingPeriod(yearOfBillingMonth: Int,
- billingMonth: Int,
- userId: String,
- userStateCache: UserStateCache,
- accounting: Accounting): Maybe[EndOfBillingState] = {
- userStateCache.findLatestUserStateForEndOfBillingMonth(userId, yearOfBillingMonth, billingMonth) match {
- case Just(userState) ⇒
- case NoVal ⇒
- case failed @ Failed(e, m) ⇒
+
+
+ def doFullMonthlyBilling(userId: String,
+ billingMonthInfo: BillingMonthInfo,
+ storeProvider: StoreProvider,
+ currentUserState: UserState,
+ defaultResourcesMap: DSLResourcesMap,
+ accounting: Accounting,
+ algorithmCompiler: CostPolicyAlgorithmCompiler,
+ calculationReason: UserStateChangeReason = NoSpecificChangeReason,
+ clogOpt: Option[ContextualLogger] = None): UserState = {
+
+
+ val clog = ContextualLogger.fromOther(
+ clogOpt,
+ logger,
+ "doFullMonthlyBilling(%s)", billingMonthInfo)
+ clog.begin()
+
+ val clogSome = Some(clog)
+
+ val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
+ userId,
+ billingMonthInfo.previousMonth,
+ storeProvider,
+ currentUserState,
+ defaultResourcesMap,
+ accounting,
+ algorithmCompiler,
+ calculationReason.forPreviousBillingMonth,
+ clogSome
+ )
+
+ val startingUserState = previousBillingMonthUserState
+
+ val userStateStore = storeProvider.userStateStore
+ val resourceEventStore = storeProvider.resourceEventStore
+ val policyStore = storeProvider.policyStore
+
+ val billingMonthStartMillis = billingMonthInfo.startMillis
+ val billingMonthEndMillis = billingMonthInfo.stopMillis
+
+ // Keep the working (current) user state. This will get updated as we proceed with billing for the month
+ // specified in the parameters.
+ // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
+ var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
+
+ val userStateWorker = UserStateWorker.fromUserState(_workingUserState, accounting, defaultResourcesMap)
+
+ userStateWorker.debugTheMaps(clog)(rcDebugInfo)
+
+ // First, find and process the actual resource events from DB
+ val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
+ userId,
+ billingMonthStartMillis,
+ billingMonthEndMillis)
+
+ val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
+
+ _workingUserState = processResourceEvents(
+ allResourceEventsForMonth,
+ _workingUserState,
+ userStateWorker,
+ policyStore,
+ calculationReason,
+ billingMonthInfo,
+ newWalletEntries,
+ algorithmCompiler,
+ clogSome
+ )
+
+ // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
+ // ... in order to generate an implicit ON later
+ val (specialEvents, theirImplicitEnds) = userStateWorker.
+ findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
+ if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
+ clog.debug("")
+ clog.debug("Process implicitly issued events")
+ clog.debugSeq("specialEvents", specialEvents, 0)
+ clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
+ }
+
+ // Now, the previous and implicitly started must be our base for the following computation, so we create an
+ // appropriate worker
+ val specialUserStateWorker = UserStateWorker(
+ userStateWorker.userId,
+ LatestResourceEventsWorker.fromList(specialEvents),
+ ImplicitlyIssuedResourceEventsWorker.Empty,
+ IgnoredFirstResourceEventsWorker.Empty,
+ userStateWorker.accounting,
+ userStateWorker.resourcesMap
+ )
+
+ _workingUserState = processResourceEvents(
+ theirImplicitEnds,
+ _workingUserState,
+ specialUserStateWorker,
+ policyStore,
+ calculationReason,
+ billingMonthInfo,
+ newWalletEntries,
+ algorithmCompiler,
+ clogSome
+ )
+
+ val lastUpdateTime = TimeHelpers.nowMillis()
+
+ _workingUserState = _workingUserState.copy(
+ implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
+ latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
+ stateChangeCounter = _workingUserState.stateChangeCounter + 1,
+ parentUserStateId = startingUserState.idOpt,
+ newWalletEntries = newWalletEntries.toList
+ )
+
+ clog.debug("calculationReason = %s", calculationReason)
+
+ if(calculationReason.shouldStoreUserState) {
+ val storedUserState = userStateStore.insertUserState(_workingUserState)
+ clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState)
+ _workingUserState = storedUserState
}
- Just(EndOfBillingState(createFirstUserState(userId), ImplicitOffEvents(Nil), OutOfSyncWalletEntries(Nil)))
+ clog.debug("RETURN %s", _workingUserState)
+ clog.end()
+ _workingUserState
}
+}
+
+/**
+ * A helper object holding intermediate state/results during resource event processing.
+ *
+ * @param previousResourceEvents
+ * This is a collection of all the latest resource events.
+ * We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
+ * ones. Will be updated on processing the next resource event.
+ *
+ * @param implicitlyIssuedStartEvents
+ * The implicitly issued resource events at the beginning of the billing period.
+ *
+ * @param ignoredFirstResourceEvents
+ * The resource events that were first (and unused) of their kind.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class UserStateWorker(userId: String,
+ previousResourceEvents: LatestResourceEventsWorker,
+ implicitlyIssuedStartEvents: ImplicitlyIssuedResourceEventsWorker,
+ ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker,
+ accounting: Accounting,
+ resourcesMap: DSLResourcesMap) {
/**
- * Find the previous resource event, if needed by the event's cost policy,
- * in order to use it for any credit calculations.
+ * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource
+ * events and b) the explicit previous resource events. If the event is found, it is removed from the
+ * respective source.
+ *
+ * If the event is not found, then this must be for a new resource instance.
+ * (and probably then some `zero` resource event must be implied as the previous one)
+ *
+ * @param resource
+ * @param instanceId
+ * @return
*/
- def findPreviousRCEventOf(rcEvent: ResourceEvent,
- costPolicy: DSLCostPolicy,
- previousRCEventsMap: mutable.Map[ResourceEvent.FullResourceType, ResourceEvent]): Maybe[ResourceEvent] = {
-
- if(costPolicy.needsPreviousEventForCreditCalculation) {
- // Get a previous resource only if this is needed by the policy
- previousRCEventsMap.get(rcEvent.fullResourceInfo) match {
- case Some(previousRCEvent) ⇒
- Just(previousRCEvent)
- case None ⇒
- queryForPreviousRCEvent(rcEvent)
- }
- } else {
- // No need for previous event. Will return NoVal
- NoVal
+ def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
+ // implicitly issued events are checked first
+ implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
+ case some @ Some(_) ⇒
+ some
+ case None ⇒
+ // explicit previous resource events are checked second
+ previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
+ case some @ Some(_) ⇒
+ some
+ case _ ⇒
+ None
+ }
}
}
- /**
- * FIXME: implement
- */
- def queryForPreviousRCEvent(rcEvent: ResourceEvent): Maybe[ResourceEvent] = {
- NoVal
+ def updateIgnored(resourceEvent: ResourceEventModel): Unit = {
+ ignoredFirstResourceEvents.updateResourceEvent(resourceEvent)
}
- type FullResourceType = ResourceEvent.FullResourceType
- def updatePreviousRCEventWith(previousRCEventsMap: mutable.Map[FullResourceType, ResourceEvent],
- newRCEvent: ResourceEvent): Unit = {
- previousRCEventsMap(newRCEvent.fullResourceInfo) = newRCEvent
+ def updatePrevious(resourceEvent: ResourceEventModel): Unit = {
+ previousResourceEvents.updateResourceEvent(resourceEvent)
}
+ def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEventModel ⇒ String): Unit = {
+ if(previousResourceEvents.size > 0) {
+ val map = previousResourceEvents.latestEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
+ clog.debugMap("previousResourceEvents", map, 0)
+ }
+ if(implicitlyIssuedStartEvents.size > 0) {
+ val map = implicitlyIssuedStartEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
+ clog.debugMap("implicitlyTerminatedResourceEvents", map, 0)
+ }
+ if(ignoredFirstResourceEvents.size > 0) {
+ val map = ignoredFirstResourceEvents.ignoredFirstEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
+ clog.debugMap("ignoredFirstResourceEvents", map, 0)
+ }
+ }
+
+// private[this]
+// def allPreviousAndAllImplicitlyStarted: List[ResourceEvent] = {
+// val buffer: FullMutableResourceTypeMap = scala.collection.mutable.Map[FullResourceType, ResourceEvent]()
+//
+// buffer ++= implicitlyIssuedStartEvents.implicitlyIssuedEventsMap
+// buffer ++= previousResourceEvents.latestEventsMap
+//
+// buffer.valuesIterator.toList
+// }
+
/**
- * Do a full month billing.
+ * Find those events from `implicitlyIssuedStartEvents` and `previousResourceEvents` that will generate implicit
+ * end events along with those implicitly issued events. Before returning, remove the events that generated the
+ * implicit ends from the internal state of this instance.
*
- * Takes into account "out of sync events".
- *
+ * @see [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
*/
- def computeFullMonthlyBilling(yearOfBillingMonth: Int,
- billingMonth: Int,
- userId: String,
- policyFinder: UserPolicyFinder,
- fullStateFinder: FullStateFinder,
- userStateCache: UserStateCache,
- rcEventStore: ResourceEventStore,
- currentUserState: UserState,
- otherStuff: Traversable[Any],
- defaultPolicy: DSLPolicy, // Policy.policy
- defaultResourcesMap: DSLResourcesMap,
- accounting: Accounting): Maybe[EndOfBillingState] = Maybe {
-
- val billingMonthStartDate = new DateCalculator(yearOfBillingMonth, billingMonth, 1)
- val billingMonthStopDate = billingMonthStartDate.copy.goEndOfThisMonth
-
- logger.debug("billingMonthStartDate = %s".format(billingMonthStartDate))
- logger.debug("billingMonthStopDate = %s".format(billingMonthStopDate))
-
- val prevBillingMonthStartDate = billingMonthStartDate.copy.goPreviousMonth
- val yearOfPrevBillingMonth = prevBillingMonthStartDate.getYear
- val prevBillingMonth = prevBillingMonthStartDate.getMonthOfYear
-
- // Check if this value is already cached and valid, otherwise compute the value
- // TODO : cache it in case of new computation
- val cachedStartUserStateM = userStateCache.findLatestUserStateForEndOfBillingMonth(
- userId,
- yearOfPrevBillingMonth,
- prevBillingMonth)
-
- val (previousStartUserState, newStartUserState) = cachedStartUserStateM match {
- case Just(cachedStartUserState) ⇒
- // So, we do have a cached user state but must check if this is still valid
- logger.debug("Found cachedStartUserState = %s".format(cachedStartUserState))
-
- // Check how many resource events were used to produce this user state
- val cachedHowmanyRCEvents = cachedStartUserState.resourceEventsCounter
-
- // Ask resource event store to see if we had any "out of sync" events for the particular (== previous)
- // billing period.
- val prevHowmanyOutOfSyncRCEvents = rcEventStore.countOutOfSyncEventsForBillingMonth(
- userId,
- yearOfPrevBillingMonth,
- prevBillingMonth)
- logger.debug("prevHowmanyOutOfSyncRCEvents = %s".format(prevHowmanyOutOfSyncRCEvents))
-
- val recomputedStartUserState = if(prevHowmanyOutOfSyncRCEvents == 0) {
- logger.debug("Not necessary to recompute start user state, using cachedStartUserState")
- // This is good, there were no "out of sync" resource events, so we can use the cached value
- cachedStartUserState
- } else {
- // Oops, there are "out of sync" resource event. Must compute (potentially recursively)
- logger.debug("Recompute start user state...")
- val computedUserStateAtStartOfBillingPeriod = computeUserStateAtEndOfBillingPeriod(
- yearOfPrevBillingMonth,
- prevBillingMonth,
- cachedStartUserState,
- accounting)
- logger.debug("computedUserStateAtStartOfiingPeriodllB = %s".format(computedUserStateAtStartOfBillingPeriod))
- val recomputedStartUserState = computedUserStateAtStartOfBillingPeriod.asInstanceOf[Just[EndOfBillingState]].get.userState // FIXME
- logger.debug("recomputedStartUserState = %s".format(recomputedStartUserState))
- recomputedStartUserState
- }
-
- (cachedStartUserState, recomputedStartUserState)
- case NoVal ⇒
- // We do not even have a cached value, so compute one!
- logger.debug("Do not have a cachedStartUserState, computing one...")
- val computedUserStateAtStartOfBillingPeriod = computeUserStateAtEndOfBillingPeriod(
- yearOfPrevBillingMonth,
- prevBillingMonth,
- currentUserState,
- accounting)
- logger.debug("computedUserStateAtStartOfBillingPeriod = %s".format(computedUserStateAtStartOfBillingPeriod))
- val recomputedStartUserState = computedUserStateAtStartOfBillingPeriod.asInstanceOf[Just[EndOfBillingState]].get.userState // FIXME
- logger.debug("recomputedStartUserState = %s".format(recomputedStartUserState))
-
- (recomputedStartUserState, recomputedStartUserState)
- case Failed(e, m) ⇒
- logger.error("[Could not find latest user state for billing month %s-%s] %s".format(yearOfPrevBillingMonth, prevBillingMonth, m), e)
- throw new Exception(m, e)
- }
+ def findAndRemoveGeneratorsOfImplicitEndEvents(newOccuredMillis: Long
+ ): (List[ResourceEventModel], List[ResourceEventModel]) = {
+ val buffer = mutable.ListBuffer[(ResourceEventModel, ResourceEventModel)]()
+ val checkSet = mutable.Set[ResourceEventModel]()
- // OK. Now that we have a user state to start with (= start of billing period reference point),
- // let us deal with the events themselves.
- val billingStartMillis = billingMonthStartDate.toMillis
- val billingStopMillis = billingMonthStopDate.toMillis
- val allBillingPeriodRelevantRCEvents = rcEventStore.findAllRelevantResourceEventsForBillingPeriod(userId, billingStartMillis, billingStopMillis)
- logger.debug("allBillingPeriodRelevantRCEvents [%s] = %s".format(allBillingPeriodRelevantRCEvents.size, allBillingPeriodRelevantRCEvents))
-
- type FullResourceType = ResourceEvent.FullResourceType
- // For each type and instance of resource, we keep the previously met resource event.
- val previousRCEventsMap = mutable.Map[FullResourceType, ResourceEvent]()
- // Since we may already have some implicit events from the beginning of the billing period, we put
- // them to the map.
- // TODO:
- val impliedRCEventsMap = mutable.Map[FullResourceType, ResourceEvent]() // those which do not exists but are
- // implied in order to do billing calculations (e.g. the "off" vmtime resource event)
-
- // Our temporary state holder.
- var _workingUserState = newStartUserState
- val nowMillis = TimeHelpers.nowMillis
- var _counter = 0
-
- for(currentResourceEvent <- allBillingPeriodRelevantRCEvents) {
- _counter = _counter + 1
- val resource = currentResourceEvent.resource
- val instanceId = currentResourceEvent.instanceId
-
- logger.debug("%02d. Processing %s".format(_counter, currentResourceEvent.toDebugString(defaultResourcesMap, true)))
- // ResourCe events Debug
- // = = =
- def RCD(fmt: String, args: Any*) = logger.debug(" ⇒ " + fmt.format(args:_*))
-
- RCD("previousRCEventsMap: ")
+ def doItFor(map: ResourceEventModel.FullMutableResourceTypeMap): Unit = {
+ val resourceEvents = map.valuesIterator
for {
- (k, v) <- previousRCEventsMap
+ resourceEvent ← resourceEvents
+ dslResource ← resourcesMap.findResource(resourceEvent.safeResource)
+ costPolicy = dslResource.costPolicy
} {
- RCD(" %s ⇒ %s".format(k, v.toDebugString(defaultResourcesMap, true)))
- }
+ if(costPolicy.supportsImplicitEvents) {
+ if(costPolicy.mustConstructImplicitEndEventFor(resourceEvent)) {
+ val implicitEnd = costPolicy.constructImplicitEndEventFor(resourceEvent, newOccuredMillis)
- // We need to do these kinds of calculations:
- // 1. Credit state calculations
- // 2. Resource state calculations
-
- // How credits are computed:
- // - "onoff" events (think "vmtime"):
- // - need to be considered in on/off pairs
- // - just use the time difference of this event to the previous one for the credit computation
- // - "discrete" events (think "bandwidth"):
- // - just use their value, which is a difference already for the credit computation
- // - "continuous" events (think "bandwidth"):
- // - need the previous absolute value
- // - need the time difference of this event to the previous one
- // - use both the above (previous absolute value, time difference) for the credit computation
- //
- // BUT ALL THE ABOVE SHOULD NOT BE CONSIDERED HERE; RATHER THEY ARE POLYMORPHIC BEHAVIOURS
-
- // What we need to do is:
- // A. Update user state with new resource instance amount
- // B. Update user state with new credit
- // C. Update ??? state with wallet entries
-
- // The DSLCostPolicy for the resource does not change, so it is safe to use the default DSLPolicy to obtain it.
- val costPolicyOpt = currentResourceEvent.findCostPolicy(defaultResourcesMap)
- costPolicyOpt match {
- case Some(costPolicy) ⇒
- RCD("Found costPolicy = %s".format(costPolicy))
-
- // If this is an event for which no action is required, then OK, proceed with the next one
- // Basically, we do nothing for ON events but we treat everything polymorphically here
- costPolicy.isBillableEventBasedOnValue(currentResourceEvent.value) match {
- case true ⇒
- ///////////////////////////////////////
- // A. Update user state with new resource instance amount
- // TODO: Check if we are at beginning of billing period, so as to use
- // costPolicy.computeResourceInstanceAmountForNewBillingPeriod
- val DefaultResourceInstanceAmount = costPolicy.getResourceInstanceInitialAmount
- RCD("DefaultResourceInstanceAmount = %s".format(DefaultResourceInstanceAmount))
-
- val previousAmount = currentUserState.getResourceInstanceAmount(resource, instanceId, DefaultResourceInstanceAmount)
- RCD("previousAmount = %s".format(previousAmount))
- val newAmount = costPolicy.computeNewResourceInstanceAmount(previousAmount, currentResourceEvent.value)
- RCD("newAmount = %s".format(newAmount))
-
- _workingUserState = _workingUserState.copyForResourcesSnapshotUpdate(resource, instanceId, newAmount, nowMillis)
- // A. Update user state with new resource instance amount
- ///////////////////////////////////////
-
-
- ///////////////////////////////////////
- // B. Update user state with new credit
- val previousRCEventM = findPreviousRCEventOf(currentResourceEvent, costPolicy, previousRCEventsMap)
- _workingUserState.findResourceInstanceSnapshot(resource, instanceId)
- // B. Update user state with new credit
- ///////////////////////////////////////
-
-
- ///////////////////////////////////////
- // C. Update ??? state with wallet entries
-
- // C. Update ??? state with wallet entries
- ///////////////////////////////////////
-
- case false ⇒ // costPolicy.isBillableEventBasedOnValue(currentResourceEvent.value)
- RCD("Ignoring not billabe (%s) %s".format(
- currentResourceEvent.beautifyValue(defaultResourcesMap),
- currentResourceEvent.toDebugString(defaultResourcesMap, true)))
- }
+ if(!checkSet.contains(resourceEvent)) {
+ checkSet.add(resourceEvent)
+ buffer append ((resourceEvent, implicitEnd))
+ }
- case None ⇒
- () // ERROR
+ // remove it anyway
+ map.remove((resourceEvent.safeResource, resourceEvent.safeInstanceId))
+ }
+ }
}
+ }
+ doItFor(previousResourceEvents.latestEventsMap) // we give priority for previous
+ doItFor(implicitlyIssuedStartEvents.implicitlyIssuedEventsMap) // ... over implicitly issued...
- updatePreviousRCEventWith(previousRCEventsMap, currentResourceEvent)
- } // for(newResourceEvent <- allBillingPeriodRelevantRCEvents)
-
-
- null
+ (buffer.view.map(_._1).toList, buffer.view.map(_._2).toList)
}
-
-
- /**
- * Runs the billing algorithm on the specified period.
- * By default, a billing period is monthly.
- * The start of the billing period is midnight of the first day of the month we compute the bill for.
- *
- */
- def doPartialMonthlyBilling(startBillingYear: Int,
- startBillingMonth: Int,
- stopBillingMillis: Long,
- userId: String,
- policyFinder: UserPolicyFinder,
- fullStateFinder: FullStateFinder,
- userStateFinder: UserStateCache,
- rcEventStore: ResourceEventStore,
- currentUserState: UserState,
- otherStuff: Traversable[Any],
- accounting: Accounting): Maybe[UserState] = Maybe {
-
-
- null.asInstanceOf[UserState]
- }
}
-object DefaultUserStateComputations extends UserStateComputations
\ No newline at end of file
+object UserStateWorker {
+ def fromUserState(userState: UserState, accounting: Accounting, resourcesMap: DSLResourcesMap): UserStateWorker = {
+ UserStateWorker(
+ userState.userID,
+ userState.latestResourceEventsSnapshot.toMutableWorker,
+ userState.implicitlyIssuedSnapshot.toMutableWorker,
+ IgnoredFirstResourceEventsWorker.Empty,
+ accounting,
+ resourcesMap
+ )
+ }
+}