From 5eb1a5f04c95cacafc2e29b5070a9f6941b259e7 Mon Sep 17 00:00:00 2001 From: Christos KK Loverdos Date: Mon, 30 Jan 2012 15:21:52 +0200 Subject: [PATCH] COntinue with billing code and start cleaning up user actor --- .../aquarium/logic/accounting/Accounting.scala | 10 +- .../logic/accounting/dsl/DSLCostPolicy.scala | 61 ++++++----- .../aquarium/logic/events/ResourceEvent.scala | 15 +++ .../gr/grnet/aquarium/user/UserDataSnapshot.scala | 19 ++-- .../aquarium/user/UserStateComputations.scala | 55 ++++++---- .../gr/grnet/aquarium/user/actor/UserActor.scala | 106 ++------------------ 6 files changed, 114 insertions(+), 152 deletions(-) diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/Accounting.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/Accounting.scala index b3d247c..b6fda15 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/accounting/Accounting.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/accounting/Accounting.scala @@ -49,7 +49,7 @@ import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just} */ trait Accounting extends DSLUtils with Loggable { - def chargeEvent( oldResourceEvent: ResourceEvent, + def chargeEvent2( oldResourceEventM: Maybe[ResourceEvent], newResourceEvent: ResourceEvent, dslAgreement: DSLAgreement, lastSnapshotDate: Date, @@ -64,7 +64,7 @@ trait Accounting extends DSLUtils with Loggable { val costPolicy = dslResource.costPolicy val isDiscrete = costPolicy.isDiscrete - val oldValue = oldResourceEvent.value + val oldValueM = oldResourceEventM.map(_.value) val newValue = newResourceEvent.value /* This is a safeguard against the special case where the last @@ -76,7 +76,7 @@ trait Accounting extends DSLUtils with Loggable { if (lastSnapshotDate.getTime == resourceEvent.occurredMillis && !isDiscrete) { Just(List()) } else { - val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(oldValue, newValue).forNoVal(Just(0.0)) + val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(oldValueM, newValue).forNoVal(Just(0.0)) for { amount <- creditCalculationValueM } yield { @@ -84,7 +84,7 @@ trait Accounting extends DSLUtils with Loggable { // above, since this point won't be reached in case of error. val isFinal = dslResource.costPolicy match { case OnOffCostPolicy => - OnOffPolicyResourceState(oldValue) match { + OnOffPolicyResourceState(oldValueM) match { case OnResourceState => false case OffResourceState => true } @@ -164,7 +164,7 @@ trait Accounting extends DSLUtils with Loggable { } } - val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(currentValue, resourceEvent.value) + val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(Just(currentValue), resourceEvent.value) val amount = creditCalculationValueM match { case failed @ Failed(_, _) ⇒ return failed diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/dsl/DSLCostPolicy.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/dsl/DSLCostPolicy.scala index 0541da7..463a45f 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/accounting/dsl/DSLCostPolicy.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/accounting/dsl/DSLCostPolicy.scala @@ -35,7 +35,7 @@ package gr.grnet.aquarium.logic.accounting.dsl -import com.ckkloverdos.maybe.{Failed, Just, Maybe} +import com.ckkloverdos.maybe.{NoVal, Failed, Just, Maybe} /** @@ -79,12 +79,12 @@ abstract class DSLCostPolicy(val name: String) { /** * Given the old value and a value from a resource event, compute the new one. */ - def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double): Double + def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double] /** * Get the value that will be used in credit calculation in Accounting.chargeEvents */ - def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double] + def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double] /** * An event's value by itself should carry enough info to characterize it billable or not. @@ -133,12 +133,19 @@ case object ContinuousCostPolicy extends DSLCostPolicy(DSLCostPolicyNames.contin override def resourceEventValueIsDiff = true - def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double) = { - oldValue + newEventValue + def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double) = { + oldValueM match { + case Just(oldValue) ⇒ + Just(oldValue + newEventValue) + case NoVal ⇒ + Failed(new Exception("NoVal for oldValue instead of Just")) + case Failed(e, m) ⇒ + Failed(new Exception("Failed for oldValue instead of Just", e), m) + } } - def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double] = { - Just(oldValue) + def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double] = { + oldValueM } } @@ -159,27 +166,35 @@ case object OnOffCostPolicy extends DSLCostPolicy(DSLCostPolicyNames.onoff) { override def resourceEventValueIsAbs = true - def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double) = { - newEventValue + def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double) = { + Just(newEventValue) } - def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double] = { + def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double] = { import OnOffCostPolicyValues.{ON, OFF} def exception(rs: OnOffPolicyResourceState) = new Exception("Resource state transition error (%s -> %s)".format(rs, rs)) def failed(rs: OnOffPolicyResourceState) = Failed(exception(rs)) - - (oldValue, newEventValue) match { - case (ON, ON) ⇒ - failed(OnResourceState) - case (ON, OFF) ⇒ - Just(OFF) - case (OFF, ON) ⇒ - Just(ON) - case (OFF, OFF) ⇒ - failed(OffResourceState) + + oldValueM match { + case Just(oldValue) ⇒ + (oldValue, newEventValue) match { + case (ON, ON) ⇒ + failed(OnResourceState) + case (ON, OFF) ⇒ + Just(OFF) + case (OFF, ON) ⇒ + Just(ON) + case (OFF, OFF) ⇒ + failed(OffResourceState) + } + + case NoVal ⇒ + Failed(new Exception("NoVal for oldValue instead of Just")) + case Failed(e, m) ⇒ + Failed(new Exception("Failed for oldValue instead of Just", e), m) } } @@ -212,11 +227,11 @@ case object DiscreteCostPolicy extends DSLCostPolicy(DSLCostPolicyNames.discrete override def resourceEventValueIsDiff = true - def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double) = { - newEventValue + def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double) = { + Just(newEventValue) } - def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double] = { + def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double] = { Just(newEventValue) } } diff --git a/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala b/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala index 7650c51..a545a1b 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala @@ -38,6 +38,7 @@ package gr.grnet.aquarium.logic.events import net.liftweb.json.{JsonAST, Xml} import gr.grnet.aquarium.util.json.JsonHelpers import gr.grnet.aquarium.logic.accounting.dsl._ +import com.ckkloverdos.maybe.Maybe /** * Event sent to Aquarium by clients for resource accounting. @@ -114,6 +115,20 @@ case class ResourceEvent( } def copyWithReceivedMillis(millis: Long) = copy(receivedMillis = millis) + + /** + * Find the cost policy of the resource named in this resource event. + * + * We do not expect cost policies for resources to change, because they are supposed + * to be one of their constant characteristics. That is why do not issue a time-dependent + * query here for the event's current policy. + * + * Should the need arises to change the cost policy for a resource, this is a good enough + * reason to consider creating another type of resource. + */ + def findCostPolicy(defaultPolicy: DSLPolicy): Maybe[DSLCostPolicy] = { + defaultPolicy.findResource(this.safeResource).map(_.costPolicy): Maybe[DSLCostPolicy] + } } object ResourceEvent { diff --git a/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala b/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala index 350e719..7c69317 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala @@ -38,6 +38,7 @@ package user import gr.grnet.aquarium.util.json.JsonSupport import gr.grnet.aquarium.logic.accounting.Policy +import com.ckkloverdos.maybe.{Maybe, Just} /** * Snapshot of data that are user-related. @@ -125,26 +126,30 @@ case class OwnedResourcesSnapshot(data: List[ResourceInstanceSnapshot], snapshot def addOrUpdateResourceSnapshot(name: String, instanceId: String, newEventValue: Double, - snapshotTime: Long): (OwnedResourcesSnapshot, Option[ResourceInstanceSnapshot], ResourceInstanceSnapshot) = { + snapshotTime: Long): (Maybe[OwnedResourcesSnapshot], Option[ResourceInstanceSnapshot], ResourceInstanceSnapshot) = { val newRCInstance = ResourceInstanceSnapshot(name, instanceId, newEventValue, snapshotTime) val oldRCInstanceOpt = this.findResourceSnapshot(name, instanceId) - val newData = oldRCInstanceOpt match { + val newDataM = oldRCInstanceOpt match { case Some(oldRCInstance) ⇒ // Need to delete the old one and add the new one // FIXME: Get rid of this Policy.policy val costPolicy = Policy.policy.findResource(name).get.costPolicy - val newValue = costPolicy.computeNewResourceInstanceValue(oldRCInstance.value, newRCInstance.value/* =newEventValue */) + val newValueM = costPolicy.computeNewResourceInstanceValue(Just(oldRCInstance.value), newRCInstance.value/* =newEventValue */) + newValueM.map { newValue ⇒ + newRCInstance.copy(data = newValue) :: (data.filterNot(_.isSameResource(name, instanceId))) + } - newRCInstance.copy(data = newValue) :: (data.filterNot(_.isSameResource(name, instanceId))) case None ⇒ // Resource not found, so this is the first time and we just add the new snapshot - newRCInstance :: data + Just(newRCInstance :: data) } - val newOwnedResources = this.copy(data = newData, snapshotTime = snapshotTime) + val newOwnedResourcesM = newDataM.map { newData ⇒ + this.copy(data = newData, snapshotTime = snapshotTime) + } - (newOwnedResources, oldRCInstanceOpt, newRCInstance) + (newOwnedResourcesM, oldRCInstanceOpt, newRCInstance) } } diff --git a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala index cfd1d70..2e31644 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala @@ -42,7 +42,7 @@ import gr.grnet.aquarium.util.date.DateCalculator import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe} import gr.grnet.aquarium.logic.accounting.Accounting import gr.grnet.aquarium.logic.events.ResourceEvent -import gr.grnet.aquarium.logic.accounting.dsl.{DSLPolicy, DSLAgreement} +import gr.grnet.aquarium.logic.accounting.dsl.{DSLCostPolicy, DSLPolicy, DSLAgreement} sealed abstract class CalculationType(_name: String) { def name = _name @@ -213,7 +213,7 @@ class UserStateComputations { // implied in order to do billing calculations (e.g. the "off" vmtime resource event) var workingUserState = newStartUserState - for(nextRCEvent <- allBillingPeriodRelevantRCEvents) { + for(newResourceEvent <- allBillingPeriodRelevantRCEvents) { // We need to do these kinds of calculations: // 1. Credit state calculations // 2. Resource state calculations @@ -233,24 +233,45 @@ class UserStateComputations { // We need: // A. The previous event - def findPreviousRCEventOf(rcEvent: ResourceEvent): Option[ResourceEvent] = { - previousRCEventsMap.get(rcEvent.fullResourceInfo) - } - def updatePreviousRCEventWith(rcEvent: ResourceEvent): Unit = { - previousRCEventsMap(rcEvent.fullResourceInfo) = rcEvent - } - - val prevRCEvent = findPreviousRCEventOf(nextRCEvent) match { - case Some(prevRCEvent) ⇒ - prevRCEvent - case None ⇒ - // Must query the DB????? - } + /** + * FIXME: implement + */ + def queryForPreviousRCEvent(rcEvent: ResourceEvent): Maybe[ResourceEvent] = { + NoVal + } - // B. The current event: [✓][✔][✗][✘]☒ OK + def findPreviousRCEventOf(rcEvent: ResourceEvent): Maybe[ResourceEvent] = { + rcEvent.findCostPolicy(defaultPolicy) match { + case Just(costPolicy) ⇒ + if(costPolicy.needsPreviousEventForCreditCalculation) { + // Get a previous resource only if this is needed by the policy + previousRCEventsMap.get(rcEvent.fullResourceInfo) match { + case Some(previousRCEvent) ⇒ + Just(previousRCEvent) + case None ⇒ + queryForPreviousRCEvent(rcEvent) + } + } else { + // No need for previous event. Will return NoVal + NoVal + } + + case NoVal ⇒ + NoVal + case failed@ Failed(_, _) ⇒ + failed + } + + } -// accounting.chargeEvent() + def updatePreviousRCEventWith(rcEventM: Maybe[ResourceEvent]): Unit = { + for(rcEvent <- rcEventM) { + previousRCEventsMap(rcEvent.fullResourceInfo) = rcEvent + } + } + + val oldResourceEventM = findPreviousRCEventOf(newResourceEvent) } diff --git a/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala b/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala index 3dc1f42..b9848be 100644 --- a/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala @@ -38,7 +38,6 @@ package gr.grnet.aquarium.user.actor import gr.grnet.aquarium.actor._ import gr.grnet.aquarium.Configurator import gr.grnet.aquarium.processor.actor._ -import com.ckkloverdos.maybe.{Failed, NoVal, Just} import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting} import gr.grnet.aquarium.user._ import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent} @@ -46,6 +45,7 @@ import java.util.Date import gr.grnet.aquarium.util.{DateUtils, Loggable} import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResource, DSLComplexResource} import gr.grnet.aquarium.util.date.TimeHelpers +import com.ckkloverdos.maybe.{Maybe, Failed, NoVal, Just} /** @@ -153,7 +153,7 @@ class UserActor extends AquariumActor val eventsDB = _configurator.storeProvider.resourceEventStore val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from) val numResourceEvents = resourceEvents.size - _userState = replayResourceEvents(_userState, resourceEvents, from, to) +// _userState = replayResourceEvents(_userState, resourceEvents, from, to) //Rebuild state from wallet entries val wallet = _configurator.storeProvider.walletEntryStore @@ -198,31 +198,6 @@ class UserActor extends AquariumActor initState.copy(active = act, roles = rol) } - /** - * Replay resource events on the provided user state - */ - def replayResourceEvents(initState: UserState, events: List[ResourceEvent], - from: Long, to: Long): UserState = { - var res = initState.ownedResources - events - .filter(e => e.occurredMillis >= from && e.occurredMillis < to) - .foreach { - e => - val name = Policy.policy.findResource(e.resource) match { - case Some(x) => x.name - case None => "" - } - - val instanceId = e.instanceId - res = res.addOrUpdateResourceSnapshot(name, - instanceId, e.value, e.occurredMillis)._1 - } - if (!events.isEmpty) { - val snapTime = events.map{e => e.occurredMillis}.max - res = res.copy(snapshotTime = snapTime) - } - initState.copy(ownedResources = res) - } /** * Replay wallet entries on the provided user state @@ -245,77 +220,6 @@ class UserActor extends AquariumActor } /** - * Update wallet entries for all unprocessed events - */ - def calcWalletEntries(): Unit = { - ensureUserState - - if (_userState.ownedResources.snapshotTime < _userState.credits.snapshotTime) return - val eventsDB = _configurator.storeProvider.resourceEventStore - val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, _userState.credits.snapshotTime) - val policy = Policy.policy - - val walletEntries = resourceEvents.map { - ev => - // TODO: Check that agreement exists - val agreement = policy.findAgreement(_userState.agreement.data).get - - val resource = policy.findResource(ev.resource) match { - case Some(x) => x - case None => - val errMsg = "Cannot find resource: %s".format(ev.resource) - ERROR(errMsg) - throw new AccountingException(errMsg) // FIXME: to throw or not to throw? - } - - // get resource instance id *only* for complex resource - // otherwise we could have used `resource.findInstanceId(ev.details)` - val instid = resource.isComplex match { - case true => ev.details.get(resource.asInstanceOf[DSLComplexResource].descriminatorField) - case false => None - } - - var currentValue: Double = 0.0 - var currentSnapshotTime = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.instanceId) match { - case Some(x) => x.snapshotTime - case None => Long.MaxValue //To trigger recalculation - } - - if (currentSnapshotTime > ev.occurredMillis) { - //Event is older that current state. Rebuild state up to event timestamp - val resHistory = - ResourceEvent("", 0, 0, _userId, "1", ev.resource, ev.instanceId, ev.eventVersion, 0, ev.details) :: - eventsDB.findResourceEventHistory(_userId, ev.resource, instid, ev.occurredMillis) - INFO("%d older entries for resource %s, user %s up to %d".format(resHistory.size, ev.resource, _userId, ev.occurredMillis)); - var res = OwnedResourcesSnapshot(List(), 0) - resHistory.foreach { - e => - // update resources state - res = res.addOrUpdateResourceSnapshot(e.resource, e.instanceId, e.value, e.occurredMillis)._1 - } - currentSnapshotTime = res.findResourceSnapshot(ev.resource, ev.instanceId).get.snapshotTime - currentValue = res.findResourceSnapshot(ev.resource, ev.instanceId).get.data - } else { - currentValue = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.instanceId).get.data - } - - val entries = chargeEvent(ev, agreement, currentValue, new Date(currentSnapshotTime), - findRelatedEntries(resource, ev.instanceId)) - INFO("PERF: CHARGE %s %d".format(ev.id, System.currentTimeMillis)) - entries match { - case Just(x) => x - case Failed(e, r) => List() - case NoVal => List() - } - }.flatten - - val walletDB = _configurator.storeProvider.walletEntryStore - walletEntries.foreach(w => walletDB.storeWalletEntry(w)) - - ensureUserState - } - - /** * Persist current user state */ private[this] def saveUserState(): Unit = { @@ -343,7 +247,7 @@ class UserActor extends AquariumActor ERROR("Received %s but my userId = %s".format(m, this._userId)) } else { //ensureUserState() - calcWalletEntries() +// calcWalletEntries() //processResourceEvent(resourceEvent, true) } @@ -357,7 +261,9 @@ class UserActor extends AquariumActor case m @ RequestUserBalance(userId, timestamp) ⇒ if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000) - calcWalletEntries() + { +// calcWalletEntries() + } self reply UserResponseGetBalance(userId, _userState.credits.data) case m @ UserRequestGetState(userId, timestamp) ⇒ -- 1.7.10.4