*/
trait Accounting extends DSLUtils with Loggable {
- def chargeEvent( oldResourceEvent: ResourceEvent,
+ def chargeEvent2( oldResourceEventM: Maybe[ResourceEvent],
newResourceEvent: ResourceEvent,
dslAgreement: DSLAgreement,
lastSnapshotDate: Date,
val costPolicy = dslResource.costPolicy
val isDiscrete = costPolicy.isDiscrete
- val oldValue = oldResourceEvent.value
+ val oldValueM = oldResourceEventM.map(_.value)
val newValue = newResourceEvent.value
/* This is a safeguard against the special case where the last
if (lastSnapshotDate.getTime == resourceEvent.occurredMillis && !isDiscrete) {
Just(List())
} else {
- val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(oldValue, newValue).forNoVal(Just(0.0))
+ val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(oldValueM, newValue).forNoVal(Just(0.0))
for {
amount <- creditCalculationValueM
} yield {
// above, since this point won't be reached in case of error.
val isFinal = dslResource.costPolicy match {
case OnOffCostPolicy =>
- OnOffPolicyResourceState(oldValue) match {
+ OnOffPolicyResourceState(oldValueM) match {
case OnResourceState => false
case OffResourceState => true
}
}
}
- val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(currentValue, resourceEvent.value)
+ val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(Just(currentValue), resourceEvent.value)
val amount = creditCalculationValueM match {
case failed @ Failed(_, _) ⇒
return failed
package gr.grnet.aquarium.logic.accounting.dsl
-import com.ckkloverdos.maybe.{Failed, Just, Maybe}
+import com.ckkloverdos.maybe.{NoVal, Failed, Just, Maybe}
/**
/**
* Given the old value and a value from a resource event, compute the new one.
*/
- def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double): Double
+ def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double]
/**
* Get the value that will be used in credit calculation in Accounting.chargeEvents
*/
- def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double]
+ def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double]
/**
* An event's value by itself should carry enough info to characterize it billable or not.
override def resourceEventValueIsDiff = true
- def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double) = {
- oldValue + newEventValue
+ def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double) = {
+ oldValueM match {
+ case Just(oldValue) ⇒
+ Just(oldValue + newEventValue)
+ case NoVal ⇒
+ Failed(new Exception("NoVal for oldValue instead of Just"))
+ case Failed(e, m) ⇒
+ Failed(new Exception("Failed for oldValue instead of Just", e), m)
+ }
}
- def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double] = {
- Just(oldValue)
+ def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double] = {
+ oldValueM
}
}
override def resourceEventValueIsAbs = true
- def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double) = {
- newEventValue
+ def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double) = {
+ Just(newEventValue)
}
- def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double] = {
+ def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double] = {
import OnOffCostPolicyValues.{ON, OFF}
def exception(rs: OnOffPolicyResourceState) =
new Exception("Resource state transition error (%s -> %s)".format(rs, rs))
def failed(rs: OnOffPolicyResourceState) =
Failed(exception(rs))
-
- (oldValue, newEventValue) match {
- case (ON, ON) ⇒
- failed(OnResourceState)
- case (ON, OFF) ⇒
- Just(OFF)
- case (OFF, ON) ⇒
- Just(ON)
- case (OFF, OFF) ⇒
- failed(OffResourceState)
+
+ oldValueM match {
+ case Just(oldValue) ⇒
+ (oldValue, newEventValue) match {
+ case (ON, ON) ⇒
+ failed(OnResourceState)
+ case (ON, OFF) ⇒
+ Just(OFF)
+ case (OFF, ON) ⇒
+ Just(ON)
+ case (OFF, OFF) ⇒
+ failed(OffResourceState)
+ }
+
+ case NoVal ⇒
+ Failed(new Exception("NoVal for oldValue instead of Just"))
+ case Failed(e, m) ⇒
+ Failed(new Exception("Failed for oldValue instead of Just", e), m)
}
}
override def resourceEventValueIsDiff = true
- def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double) = {
- newEventValue
+ def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double) = {
+ Just(newEventValue)
}
- def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double] = {
+ def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double] = {
Just(newEventValue)
}
}
import net.liftweb.json.{JsonAST, Xml}
import gr.grnet.aquarium.util.json.JsonHelpers
import gr.grnet.aquarium.logic.accounting.dsl._
+import com.ckkloverdos.maybe.Maybe
/**
* Event sent to Aquarium by clients for resource accounting.
}
def copyWithReceivedMillis(millis: Long) = copy(receivedMillis = millis)
+
+ /**
+ * Find the cost policy of the resource named in this resource event.
+ *
+ * We do not expect cost policies for resources to change, because they are supposed
+ * to be one of their constant characteristics. That is why do not issue a time-dependent
+ * query here for the event's current policy.
+ *
+ * Should the need arises to change the cost policy for a resource, this is a good enough
+ * reason to consider creating another type of resource.
+ */
+ def findCostPolicy(defaultPolicy: DSLPolicy): Maybe[DSLCostPolicy] = {
+ defaultPolicy.findResource(this.safeResource).map(_.costPolicy): Maybe[DSLCostPolicy]
+ }
}
object ResourceEvent {
import gr.grnet.aquarium.util.json.JsonSupport
import gr.grnet.aquarium.logic.accounting.Policy
+import com.ckkloverdos.maybe.{Maybe, Just}
/**
* Snapshot of data that are user-related.
def addOrUpdateResourceSnapshot(name: String,
instanceId: String,
newEventValue: Double,
- snapshotTime: Long): (OwnedResourcesSnapshot, Option[ResourceInstanceSnapshot], ResourceInstanceSnapshot) = {
+ snapshotTime: Long): (Maybe[OwnedResourcesSnapshot], Option[ResourceInstanceSnapshot], ResourceInstanceSnapshot) = {
val newRCInstance = ResourceInstanceSnapshot(name, instanceId, newEventValue, snapshotTime)
val oldRCInstanceOpt = this.findResourceSnapshot(name, instanceId)
- val newData = oldRCInstanceOpt match {
+ val newDataM = oldRCInstanceOpt match {
case Some(oldRCInstance) ⇒
// Need to delete the old one and add the new one
// FIXME: Get rid of this Policy.policy
val costPolicy = Policy.policy.findResource(name).get.costPolicy
- val newValue = costPolicy.computeNewResourceInstanceValue(oldRCInstance.value, newRCInstance.value/* =newEventValue */)
+ val newValueM = costPolicy.computeNewResourceInstanceValue(Just(oldRCInstance.value), newRCInstance.value/* =newEventValue */)
+ newValueM.map { newValue ⇒
+ newRCInstance.copy(data = newValue) :: (data.filterNot(_.isSameResource(name, instanceId)))
+ }
- newRCInstance.copy(data = newValue) :: (data.filterNot(_.isSameResource(name, instanceId)))
case None ⇒
// Resource not found, so this is the first time and we just add the new snapshot
- newRCInstance :: data
+ Just(newRCInstance :: data)
}
- val newOwnedResources = this.copy(data = newData, snapshotTime = snapshotTime)
+ val newOwnedResourcesM = newDataM.map { newData ⇒
+ this.copy(data = newData, snapshotTime = snapshotTime)
+ }
- (newOwnedResources, oldRCInstanceOpt, newRCInstance)
+ (newOwnedResourcesM, oldRCInstanceOpt, newRCInstance)
}
}
import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
import gr.grnet.aquarium.logic.accounting.Accounting
import gr.grnet.aquarium.logic.events.ResourceEvent
-import gr.grnet.aquarium.logic.accounting.dsl.{DSLPolicy, DSLAgreement}
+import gr.grnet.aquarium.logic.accounting.dsl.{DSLCostPolicy, DSLPolicy, DSLAgreement}
sealed abstract class CalculationType(_name: String) {
def name = _name
// implied in order to do billing calculations (e.g. the "off" vmtime resource event)
var workingUserState = newStartUserState
- for(nextRCEvent <- allBillingPeriodRelevantRCEvents) {
+ for(newResourceEvent <- allBillingPeriodRelevantRCEvents) {
// We need to do these kinds of calculations:
// 1. Credit state calculations
// 2. Resource state calculations
// We need:
// A. The previous event
- def findPreviousRCEventOf(rcEvent: ResourceEvent): Option[ResourceEvent] = {
- previousRCEventsMap.get(rcEvent.fullResourceInfo)
- }
- def updatePreviousRCEventWith(rcEvent: ResourceEvent): Unit = {
- previousRCEventsMap(rcEvent.fullResourceInfo) = rcEvent
- }
-
- val prevRCEvent = findPreviousRCEventOf(nextRCEvent) match {
- case Some(prevRCEvent) ⇒
- prevRCEvent
- case None ⇒
- // Must query the DB?????
- }
+ /**
+ * FIXME: implement
+ */
+ def queryForPreviousRCEvent(rcEvent: ResourceEvent): Maybe[ResourceEvent] = {
+ NoVal
+ }
- // B. The current event: [✓][✔][✗][✘]☒ OK
+ def findPreviousRCEventOf(rcEvent: ResourceEvent): Maybe[ResourceEvent] = {
+ rcEvent.findCostPolicy(defaultPolicy) match {
+ case Just(costPolicy) ⇒
+ if(costPolicy.needsPreviousEventForCreditCalculation) {
+ // Get a previous resource only if this is needed by the policy
+ previousRCEventsMap.get(rcEvent.fullResourceInfo) match {
+ case Some(previousRCEvent) ⇒
+ Just(previousRCEvent)
+ case None ⇒
+ queryForPreviousRCEvent(rcEvent)
+ }
+ } else {
+ // No need for previous event. Will return NoVal
+ NoVal
+ }
+
+ case NoVal ⇒
+ NoVal
+ case failed@ Failed(_, _) ⇒
+ failed
+ }
+
+ }
-// accounting.chargeEvent()
+ def updatePreviousRCEventWith(rcEventM: Maybe[ResourceEvent]): Unit = {
+ for(rcEvent <- rcEventM) {
+ previousRCEventsMap(rcEvent.fullResourceInfo) = rcEvent
+ }
+ }
+
+ val oldResourceEventM = findPreviousRCEventOf(newResourceEvent)
}
import gr.grnet.aquarium.actor._
import gr.grnet.aquarium.Configurator
import gr.grnet.aquarium.processor.actor._
-import com.ckkloverdos.maybe.{Failed, NoVal, Just}
import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting}
import gr.grnet.aquarium.user._
import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent}
import gr.grnet.aquarium.util.{DateUtils, Loggable}
import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResource, DSLComplexResource}
import gr.grnet.aquarium.util.date.TimeHelpers
+import com.ckkloverdos.maybe.{Maybe, Failed, NoVal, Just}
/**
val eventsDB = _configurator.storeProvider.resourceEventStore
val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from)
val numResourceEvents = resourceEvents.size
- _userState = replayResourceEvents(_userState, resourceEvents, from, to)
+// _userState = replayResourceEvents(_userState, resourceEvents, from, to)
//Rebuild state from wallet entries
val wallet = _configurator.storeProvider.walletEntryStore
initState.copy(active = act, roles = rol)
}
- /**
- * Replay resource events on the provided user state
- */
- def replayResourceEvents(initState: UserState, events: List[ResourceEvent],
- from: Long, to: Long): UserState = {
- var res = initState.ownedResources
- events
- .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
- .foreach {
- e =>
- val name = Policy.policy.findResource(e.resource) match {
- case Some(x) => x.name
- case None => ""
- }
-
- val instanceId = e.instanceId
- res = res.addOrUpdateResourceSnapshot(name,
- instanceId, e.value, e.occurredMillis)._1
- }
- if (!events.isEmpty) {
- val snapTime = events.map{e => e.occurredMillis}.max
- res = res.copy(snapshotTime = snapTime)
- }
- initState.copy(ownedResources = res)
- }
/**
* Replay wallet entries on the provided user state
}
/**
- * Update wallet entries for all unprocessed events
- */
- def calcWalletEntries(): Unit = {
- ensureUserState
-
- if (_userState.ownedResources.snapshotTime < _userState.credits.snapshotTime) return
- val eventsDB = _configurator.storeProvider.resourceEventStore
- val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, _userState.credits.snapshotTime)
- val policy = Policy.policy
-
- val walletEntries = resourceEvents.map {
- ev =>
- // TODO: Check that agreement exists
- val agreement = policy.findAgreement(_userState.agreement.data).get
-
- val resource = policy.findResource(ev.resource) match {
- case Some(x) => x
- case None =>
- val errMsg = "Cannot find resource: %s".format(ev.resource)
- ERROR(errMsg)
- throw new AccountingException(errMsg) // FIXME: to throw or not to throw?
- }
-
- // get resource instance id *only* for complex resource
- // otherwise we could have used `resource.findInstanceId(ev.details)`
- val instid = resource.isComplex match {
- case true => ev.details.get(resource.asInstanceOf[DSLComplexResource].descriminatorField)
- case false => None
- }
-
- var currentValue: Double = 0.0
- var currentSnapshotTime = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.instanceId) match {
- case Some(x) => x.snapshotTime
- case None => Long.MaxValue //To trigger recalculation
- }
-
- if (currentSnapshotTime > ev.occurredMillis) {
- //Event is older that current state. Rebuild state up to event timestamp
- val resHistory =
- ResourceEvent("", 0, 0, _userId, "1", ev.resource, ev.instanceId, ev.eventVersion, 0, ev.details) ::
- eventsDB.findResourceEventHistory(_userId, ev.resource, instid, ev.occurredMillis)
- INFO("%d older entries for resource %s, user %s up to %d".format(resHistory.size, ev.resource, _userId, ev.occurredMillis));
- var res = OwnedResourcesSnapshot(List(), 0)
- resHistory.foreach {
- e =>
- // update resources state
- res = res.addOrUpdateResourceSnapshot(e.resource, e.instanceId, e.value, e.occurredMillis)._1
- }
- currentSnapshotTime = res.findResourceSnapshot(ev.resource, ev.instanceId).get.snapshotTime
- currentValue = res.findResourceSnapshot(ev.resource, ev.instanceId).get.data
- } else {
- currentValue = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.instanceId).get.data
- }
-
- val entries = chargeEvent(ev, agreement, currentValue, new Date(currentSnapshotTime),
- findRelatedEntries(resource, ev.instanceId))
- INFO("PERF: CHARGE %s %d".format(ev.id, System.currentTimeMillis))
- entries match {
- case Just(x) => x
- case Failed(e, r) => List()
- case NoVal => List()
- }
- }.flatten
-
- val walletDB = _configurator.storeProvider.walletEntryStore
- walletEntries.foreach(w => walletDB.storeWalletEntry(w))
-
- ensureUserState
- }
-
- /**
* Persist current user state
*/
private[this] def saveUserState(): Unit = {
ERROR("Received %s but my userId = %s".format(m, this._userId))
} else {
//ensureUserState()
- calcWalletEntries()
+// calcWalletEntries()
//processResourceEvent(resourceEvent, true)
}
case m @ RequestUserBalance(userId, timestamp) ⇒
if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
- calcWalletEntries()
+ {
+// calcWalletEntries()
+ }
self reply UserResponseGetBalance(userId, _userState.credits.data)
case m @ UserRequestGetState(userId, timestamp) ⇒