From cbb103bbd5bc49c543f990b4d48abb742a791fad Mon Sep 17 00:00:00 2001 From: Christos KK Loverdos Date: Tue, 20 Mar 2012 13:16:53 +0200 Subject: [PATCH] Start hooking implicitly issued events --- .../logic/accounting/dsl/DSLCostPolicy.scala | 69 +++++++++++++------- .../aquarium/logic/events/ResourceEvent.scala | 24 ++++++- .../gr/grnet/aquarium/user/UserDataSnapshot.scala | 22 ++++--- .../aquarium/user/UserStateComputations.scala | 56 +++++++++++----- 4 files changed, 121 insertions(+), 50 deletions(-) diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/dsl/DSLCostPolicy.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/dsl/DSLCostPolicy.scala index 0300ba8..152db71 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/accounting/dsl/DSLCostPolicy.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/accounting/dsl/DSLCostPolicy.scala @@ -169,10 +169,10 @@ abstract class DSLCostPolicy(val name: String, val vars: Set[DSLCostPolicyVar]) */ def supportsImplicitEvents: Boolean - def mustConstructImplicitEndEventFor(eventValue: Double): Boolean + def mustConstructImplicitEndEventFor(resourceEvent: ResourceEvent): Boolean @throws(classOf[Exception]) - def constructImplicitEndEventFor(resourceEvent: ResourceEvent): ResourceEvent + def constructImplicitEndEventFor(resourceEvent: ResourceEvent, newOccurredMillis: Long): ResourceEvent @throws(classOf[Exception]) def constructImplicitStartEventFor(resourceEvent: ResourceEvent): ResourceEvent @@ -237,10 +237,6 @@ object DSLCostPolicy { case object OnceCostPolicy extends DSLCostPolicy(DSLCostPolicyNames.once, Set(DSLCostPolicyNameVar, DSLCurrentValueVar)) { - def supportsImplicitEvents = false - - def mustConstructImplicitEndEventFor(eventValue: Double) = false - def isBillableFirstEventBasedOnValue(eventValue: Double) = true def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double) = oldAmount @@ -251,7 +247,11 @@ case object OnceCostPolicy def getValueForCreditCalculation(oldAmountM: Maybe[Double], newEventValue: Double) = Just(newEventValue) - def constructImplicitEndEventFor(resourceEvent: ResourceEvent) = { + def supportsImplicitEvents = false + + def mustConstructImplicitEndEventFor(resourceEvent: ResourceEvent) = false + + def constructImplicitEndEventFor(resourceEvent: ResourceEvent, occurredMillis: Long) = { throw new Exception("constructImplicitEndEventFor() Not compliant with %s".format(this)) } @@ -292,13 +292,25 @@ case object ContinuousCostPolicy } def supportsImplicitEvents = { - false + true + } + + def mustConstructImplicitEndEventFor(resourceEvent: ResourceEvent) = { + true } - def mustConstructImplicitEndEventFor(eventValue: Double) = false + def constructImplicitEndEventFor(resourceEvent: ResourceEvent, newOccurredMillis: Long) = { + assert(supportsImplicitEvents && mustConstructImplicitEndEventFor(resourceEvent)) - def constructImplicitEndEventFor(resourceEvent: ResourceEvent) = { - throw new Exception("constructImplicitEndEventFor() Not compliant with %s".format(this)) + val details = resourceEvent.details + val newDetails = ResourceEvent.setAquariumSyntheticAndImplicitEnd(details) + val newValue = resourceEvent.value + + resourceEvent.copy( + occurredMillis = newOccurredMillis, + details = newDetails, + value = newValue + ) } def constructImplicitStartEventFor(resourceEvent: ResourceEvent) = { @@ -374,7 +386,7 @@ case object OnOffCostPolicy override def isBillableEventBasedOnValue(eventValue: Double) = { // ON events do not contribute, only OFF ones. - OnOffCostPolicyValues.isOFF(eventValue) + OnOffCostPolicyValues.isOFFValue(eventValue) } def isBillableFirstEventBasedOnValue(eventValue: Double) = { @@ -386,14 +398,25 @@ case object OnOffCostPolicy } - def mustConstructImplicitEndEventFor(eventValue: Double) = { + def mustConstructImplicitEndEventFor(resourceEvent: ResourceEvent) = { // If we have ON events with no OFF companions at the end of the billing period, // then we must generate implicit OFF events. - OnOffCostPolicyValues.isON(eventValue) + OnOffCostPolicyValues.isONValue(resourceEvent.value) } - def constructImplicitEndEventFor(resourceEvent: ResourceEvent) = { - throw new Exception("constructImplicitEndEventFor() Not compliant with %s".format(this)) + def constructImplicitEndEventFor(resourceEvent: ResourceEvent, newOccurredMillis: Long) = { + assert(supportsImplicitEvents && mustConstructImplicitEndEventFor(resourceEvent)) + assert(OnOffCostPolicyValues.isONValue(resourceEvent.value)) + + val details = resourceEvent.details + val newDetails = ResourceEvent.setAquariumSyntheticAndImplicitEnd(details) + val newValue = OnOffCostPolicyValues.OFF + + resourceEvent.copy( + occurredMillis = newOccurredMillis, + details = newDetails, + value = newValue + ) } def constructImplicitStartEventFor(resourceEvent: ResourceEvent) = { @@ -402,11 +425,11 @@ case object OnOffCostPolicy } object OnOffCostPolicyValues { - final val ON : Double = 1.0 - final val OFF: Double = 0.0 + final val ON = 1.0 + final val OFF = 0.0 - def isON (value: Double) = value == ON - def isOFF(value: Double) = value == OFF + def isONValue (value: Double) = value == ON + def isOFFValue(value: Double) = value == OFF } /** @@ -445,9 +468,11 @@ case object DiscreteCostPolicy extends DSLCostPolicy(DSLCostPolicyNames.discrete false } - def mustConstructImplicitEndEventFor(eventValue: Double) = false + def mustConstructImplicitEndEventFor(resourceEvent: ResourceEvent) = { + false + } - def constructImplicitEndEventFor(resourceEvent: ResourceEvent) = { + def constructImplicitEndEventFor(resourceEvent: ResourceEvent, occurredMillis: Long) = { throw new Exception("constructImplicitEndEventFor() Not compliant with %s".format(this)) } diff --git a/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala b/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala index b1d42b1..cdd0768 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala @@ -41,6 +41,7 @@ import gr.grnet.aquarium.logic.accounting.dsl._ import com.ckkloverdos.maybe.{MaybeOption, Maybe} import java.util.Date import gr.grnet.aquarium.util.date.MutableDateCalc +import collection.SeqLike /** * Event sent to Aquarium by clients for resource accounting. @@ -233,7 +234,7 @@ object ResourceEvent { type ResourceType = String type ResourceIdType = String type FullResourceType = (ResourceType, ResourceIdType) -// type FullResourceTypeMap = Map[FullResourceType, ResourceEvent] + type FullResourceTypeMap = Map[FullResourceType, ResourceEvent] type FullMutableResourceTypeMap = scala.collection.mutable.Map[FullResourceType, ResourceEvent] def fromJson(json: String): ResourceEvent = { @@ -252,6 +253,20 @@ object ResourceEvent { fromJValue(Xml.toJson(scala.xml.XML.loadString(xml))) } + def setAquariumSynthetic(map: ResourceEvent.Details): ResourceEvent.Details = { + map.updated(JsonNames.details_aquarium_is_synthetic, "true") + } + + def setAquariumSyntheticAndImplicitEnd(map: ResourceEvent.Details): ResourceEvent.Details = { + map. + updated(JsonNames.details_aquarium_is_synthetic, "true"). + updated(JsonNames.details_aquarium_is_implicit_end, "true") + } + + def sortByOccurred[S <: Seq[ResourceEvent]](events: S with SeqLike[ResourceEvent, S]): S = { + events.sortWith(_.occurredMillis <= _.occurredMillis) + } + object JsonNames { final val _id = "_id" final val id = "id" @@ -264,5 +279,12 @@ object ResourceEvent { final val eventVersion = "eventVersion" final val value = "value" final val details = "details" + + // This is set in the details map to indicate a synthetic resource event (ie not a real one). + // Examples of synthetic resource events are those that are implicitly generated at the + // end of the billing period (e.g. `OFF`s). + final val details_aquarium_is_synthetic = "__aquarium_is_synthetic__" + + final val details_aquarium_is_implicit_end = "__aquarium_is_implicit_end__" } } \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala b/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala index cbb5ef7..8e74097 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala @@ -270,11 +270,11 @@ case class LatestResourceEventsSnapshot(resourceEvents: List[ResourceEvent], /** * This is the mutable cousin of [[gr.grnet.aquarium.user.LatestResourceEventsSnapshot]]. * - * @param resourceEventsMap + * @param latestEventsMap * * @author Christos KK Loverdos */ -case class LatestResourceEventsWorker(resourceEventsMap: FullMutableResourceTypeMap) { +case class LatestResourceEventsWorker(latestEventsMap: FullMutableResourceTypeMap) { /** * The gateway to immutable state. @@ -283,24 +283,24 @@ case class LatestResourceEventsWorker(resourceEventsMap: FullMutableResourceType * @return A fresh instance of [[gr.grnet.aquarium.user.LatestResourceEventsSnapshot]]. */ def toImmutableSnapshot(snapshotTime: Long) = - LatestResourceEventsSnapshot(resourceEventsMap.valuesIterator.toList, snapshotTime) + LatestResourceEventsSnapshot(latestEventsMap.valuesIterator.toList, snapshotTime) def updateResourceEvent(resourceEvent: ResourceEvent): Unit = { - resourceEventsMap((resourceEvent.resource, resourceEvent.instanceId)) = resourceEvent + latestEventsMap((resourceEvent.resource, resourceEvent.instanceId)) = resourceEvent } def findResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = { - findFromMapAsMaybe(resourceEventsMap, (resource, instanceId)) + findFromMapAsMaybe(latestEventsMap, (resource, instanceId)) } def findAndRemoveResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = { - findAndRemoveFromMap(resourceEventsMap, (resource, instanceId)) + findAndRemoveFromMap(latestEventsMap, (resource, instanceId)) } - def size = resourceEventsMap.size + def size = latestEventsMap.size def foreach[U](f: ResourceEvent => U): Unit = { - resourceEventsMap.valuesIterator.foreach(f) + latestEventsMap.valuesIterator.foreach(f) } } @@ -339,8 +339,12 @@ case class ImplicitlyIssuedResourceEventsSnapshot(implicitlyIssuedEvents: List[R */ case class ImplicitlyIssuedResourceEventsWorker(implicitlyIssuedEventsMap: FullMutableResourceTypeMap) { + def toList: scala.List[ResourceEvent] = { + implicitlyIssuedEventsMap.valuesIterator.toList + } + def toImmutableSnapshot(snapshotTime: Long) = - ImplicitlyIssuedResourceEventsSnapshot(implicitlyIssuedEventsMap.valuesIterator.toList, snapshotTime) + ImplicitlyIssuedResourceEventsSnapshot(toList, snapshotTime) def findAndRemoveResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = { findAndRemoveFromMap(implicitlyIssuedEventsMap, (resource, instanceId)) diff --git a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala index 5740909..b3ad76d 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala @@ -45,6 +45,7 @@ import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLCostPolicy, DSLR import gr.grnet.aquarium.store.{RecordID, StoreProvider, PolicyStore, UserStateStore, ResourceEventStore} import gr.grnet.aquarium.logic.accounting.{Chargeslot, Accounting} import collection.mutable.{Buffer, ListBuffer} +import gr.grnet.aquarium.logic.events.ResourceEvent._ /** * @@ -409,6 +410,8 @@ class UserStateComputations extends Loggable { "doFullMonthlyBilling(%s)", billingMonthInfo) clog.begin() + val clogJ = Just(clog) + val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth( userId, billingMonthInfo.previousMonth, @@ -417,7 +420,7 @@ class UserStateComputations extends Loggable { defaultResourcesMap, accounting, calculationReason.forPreviousBillingMonth, - Just(clog) + clogJ ) if(previousBillingMonthUserStateM.isNoVal) { @@ -441,22 +444,16 @@ class UserStateComputations extends Loggable { // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies var _workingUserState = startingUserState.copyForChangeReason(calculationReason) - val userStateWorker = UserStateWorker.fromUserState(startingUserState, accounting, defaultResourcesMap) + val userStateWorker = UserStateWorker.fromUserState(_workingUserState, accounting, defaultResourcesMap) userStateWorker.debugTheMaps(clog)(rcDebugInfo) - // Find the actual resource events from DB + // First, find and process the actual resource events from DB val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod( userId, billingMonthStartMillis, billingMonthEndMillis) - if(allResourceEventsForMonth.size > 0) { - clog.debug("Found %s resource events, starting processing...", allResourceEventsForMonth.size) - } else { - clog.debug("Not found any resource events") - } - val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]() _workingUserState = processResourceEvents( @@ -467,13 +464,28 @@ class UserStateComputations extends Loggable { calculationReason, billingMonthInfo, newWalletEntries, - Just(clog) + clogJ ) + // Second, for the remaining events which must contribute an implicit OFF, we collect and process + // those OFFs and generate an implicit ON + val allEndEventsBuffer = ListBuffer[ResourceEvent]() + for { + aPreviousEvent <- userStateWorker.allPreviousAndAllImplicitlyStarted + dslResource <- defaultResourcesMap.findResource(aPreviousEvent.safeResource) + costPolicy = dslResource.costPolicy + } { + if(costPolicy.supportsImplicitEvents) { + if(costPolicy.mustConstructImplicitEndEventFor(aPreviousEvent)) { + allEndEventsBuffer append costPolicy.constructImplicitEndEventFor(aPreviousEvent, billingMonthEndMillis) + } + } + } + val lastUpdateTime = TimeHelpers.nowMillis _workingUserState = _workingUserState.copy( - implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedResourceEvents.toImmutableSnapshot(lastUpdateTime), + implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime), latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime), stateChangeCounter = _workingUserState.stateChangeCounter + 1, parentUserStateId = startingUserState.idOpt, @@ -509,8 +521,8 @@ class UserStateComputations extends Loggable { * We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time) * ones. Will be updated on processing the next resource event. * - * @param implicitlyIssuedResourceEvents - * The implicitly issued resource events (from previous billing period). + * @param implicitlyIssuedStartEvents + * The implicitly issued resource events at the beginning of the billing period. * * @param ignoredFirstResourceEvents * The resource events that were first (and unused) of their kind. @@ -519,7 +531,7 @@ class UserStateComputations extends Loggable { */ case class UserStateWorker(userId: String, previousResourceEvents: LatestResourceEventsWorker, - implicitlyIssuedResourceEvents: ImplicitlyIssuedResourceEventsWorker, + implicitlyIssuedStartEvents: ImplicitlyIssuedResourceEventsWorker, ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker, accounting: Accounting, resourcesMap: DSLResourcesMap) { @@ -538,7 +550,7 @@ case class UserStateWorker(userId: String, */ def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = { // implicitly issued events are checked first - implicitlyIssuedResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match { + implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match { case just @ Just(_) ⇒ just case NoVal ⇒ @@ -564,11 +576,11 @@ case class UserStateWorker(userId: String, def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEvent ⇒ String): Unit = { if(previousResourceEvents.size > 0) { - val map = previousResourceEvents.resourceEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) } + val map = previousResourceEvents.latestEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) } clog.debugMap("previousResourceEvents", map, 0) } - if(implicitlyIssuedResourceEvents.size > 0) { - val map = implicitlyIssuedResourceEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) } + if(implicitlyIssuedStartEvents.size > 0) { + val map = implicitlyIssuedStartEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) } clog.debugMap("implicitlyTerminatedResourceEvents", map, 0) } if(ignoredFirstResourceEvents.size > 0) { @@ -577,6 +589,14 @@ case class UserStateWorker(userId: String, } } + def allPreviousAndAllImplicitlyStarted: List[ResourceEvent] = { + val buffer: FullMutableResourceTypeMap = scala.collection.mutable.Map[FullResourceType, ResourceEvent]() + + buffer ++= implicitlyIssuedStartEvents.implicitlyIssuedEventsMap + buffer ++= previousResourceEvents.latestEventsMap + + buffer.valuesIterator.toList + } } object UserStateWorker { -- 1.7.10.4