/*
- * Copyright 2011 GRNET S.A. All rights reserved.
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
package gr.grnet.aquarium.logic.accounting
+import gr.grnet.aquarium.util.shortClassNameOf
+import algorithm.CostPolicyAlgorithmCompiler
import dsl._
-import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent}
import collection.immutable.SortedMap
import java.util.Date
-import gr.grnet.aquarium.util.{CryptoUtils, Loggable}
import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just}
+import gr.grnet.aquarium.util.{ContextualLogger, CryptoUtils, Loggable}
+import gr.grnet.aquarium.store.PolicyStore
+import gr.grnet.aquarium.event.{WalletEntry}
+import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
+import gr.grnet.aquarium.event.resource.ResourceEventModel
+import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
+
+/**
+ * A timeslot together with the algorithm and unit price that apply for this particular timeslot.
+ *
+ * @param startMillis
+ * @param stopMillis
+ * @param algorithmDefinition
+ * @param unitPrice
+ * @param computedCredits The computed credits
+ */
+case class Chargeslot(startMillis: Long,
+ stopMillis: Long,
+ algorithmDefinition: String,
+ unitPrice: Double,
+ computedCredits: Option[Double] = None) {
+
+ override def toString = "%s(%s, %s, %s, %s, %s)".format(
+ shortClassNameOf(this),
+ new MutableDateCalc(startMillis).toYYYYMMDDHHMMSSSSS,
+ new MutableDateCalc(stopMillis).toYYYYMMDDHHMMSSSSS,
+ unitPrice,
+ computedCredits,
+ algorithmDefinition
+ )
+}
/**
* Methods for converting accounting events to wallet entries.
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
trait Accounting extends DSLUtils with Loggable {
-
- def chargeEvent2( oldResourceEventM: Maybe[ResourceEvent],
- newResourceEvent: ResourceEvent,
- dslAgreement: DSLAgreement,
- lastSnapshotDate: Date,
- related: Traversable[WalletEntry]): Maybe[Traversable[WalletEntry]] = {
- Maybe {
- val dslPolicy: DSLPolicy = Policy.policy // TODO: query based on time
- val resourceEvent = newResourceEvent
- dslPolicy.findResource(resourceEvent.resource) match {
+ /**
+ * Breaks a reference timeslot (e.g. billing period) according to policies and agreements.
+ *
+ * @param referenceTimeslot
+ * @param policyTimeslots
+ * @param agreementTimeslots
+ * @return
+ */
+ protected
+ def splitTimeslotByPoliciesAndAgreements(referenceTimeslot: Timeslot,
+ policyTimeslots: List[Timeslot],
+ agreementTimeslots: List[Timeslot],
+ clogM: Maybe[ContextualLogger] = NoVal): List[Timeslot] = {
+
+// val clog = ContextualLogger.fromOther(clogM, logger, "splitTimeslotByPoliciesAndAgreements()")
+// clog.begin()
+
+ // Align policy and agreement validity timeslots to the referenceTimeslot
+ val alignedPolicyTimeslots = referenceTimeslot.align(policyTimeslots)
+ val alignedAgreementTimeslots = referenceTimeslot.align(agreementTimeslots)
+
+// clog.debug("referenceTimeslot = %s", referenceTimeslot)
+// clog.debugSeq("alignedPolicyTimeslots", alignedPolicyTimeslots, 0)
+// clog.debugSeq("alignedAgreementTimeslots", alignedAgreementTimeslots, 0)
+
+ val result = alignTimeslots(alignedPolicyTimeslots, alignedAgreementTimeslots)
+// clog.debugSeq("result", result, 1)
+// clog.end()
+ result
+ }
+
+ /**
+ * Given a reference timeslot, we have to break it up to a series of timeslots where a particular
+ * algorithm and price unit is in effect.
+ *
+ */
+ protected
+ def resolveEffectiveAlgorithmsAndPriceLists(alignedTimeslot: Timeslot,
+ agreement: DSLAgreement,
+ clogOpt: Option[ContextualLogger] = None):
+ (Map[Timeslot, DSLAlgorithm], Map[Timeslot, DSLPriceList]) = {
+
+ val clog = ContextualLogger.fromOther(clogOpt, logger, "resolveEffectiveAlgorithmsAndPriceLists()")
+
+ // Note that most of the code is taken from calcChangeChunks()
+ val alg = resolveEffectiveAlgorithmsForTimeslot(alignedTimeslot, agreement)
+ val pri = resolveEffectivePricelistsForTimeslot(alignedTimeslot, agreement)
+ val chargeChunks = splitChargeChunks(alg, pri)
+ val algorithmByTimeslot = chargeChunks._1
+ val pricelistByTimeslot = chargeChunks._2
+
+ assert(algorithmByTimeslot.size == pricelistByTimeslot.size)
+
+ (algorithmByTimeslot, pricelistByTimeslot)
+ }
+
+ protected
+ def computeInitialChargeslots(referenceTimeslot: Timeslot,
+ dslResource: DSLResource,
+ policiesByTimeslot: Map[Timeslot, DSLPolicy],
+ agreementNamesByTimeslot: Map[Timeslot, String],
+ clogOpt: Option[ContextualLogger] = None): List[Chargeslot] = {
+
+ val clog = ContextualLogger.fromOther(clogOpt, logger, "computeInitialChargeslots()")
+// clog.begin()
+
+ val policyTimeslots = policiesByTimeslot.keySet
+ val agreementTimeslots = agreementNamesByTimeslot.keySet
+
+// clog.debugMap("policiesByTimeslot", policiesByTimeslot, 1)
+// clog.debugMap("agreementNamesByTimeslot", agreementNamesByTimeslot, 1)
+
+ def getPolicy(ts: Timeslot): DSLPolicy = {
+ policiesByTimeslot.find(_._1.contains(ts)).get._2
+ }
+ def getAgreementName(ts: Timeslot): String = {
+ agreementNamesByTimeslot.find(_._1.contains(ts)).get._2
+ }
+
+ // 1. Round ONE: split time according to overlapping policies and agreements.
+// clog.begin("ROUND 1")
+ val alignedTimeslots = splitTimeslotByPoliciesAndAgreements(referenceTimeslot, policyTimeslots.toList, agreementTimeslots.toList, Just(clog))
+// clog.debugSeq("alignedTimeslots", alignedTimeslots, 1)
+// clog.end("ROUND 1")
+
+ // 2. Round TWO: Use the aligned timeslots of Round ONE to produce even more
+ // fine-grained timeslots according to applicable algorithms.
+ // Then pack the info into charge slots.
+// clog.begin("ROUND 2")
+ val allChargeslots = for {
+ alignedTimeslot <- alignedTimeslots
+ } yield {
+// val alignedTimeslotMsg = "alignedTimeslot = %s".format(alignedTimeslot)
+// clog.begin(alignedTimeslotMsg)
+
+ val dslPolicy = getPolicy(alignedTimeslot)
+// clog.debug("dslPolicy = %s", dslPolicy)
+ val agreementName = getAgreementName(alignedTimeslot)
+// clog.debug("agreementName = %s", agreementName)
+ val agreementOpt = dslPolicy.findAgreement(agreementName)
+// clog.debug("agreementOpt = %s", agreementOpt)
+
+ agreementOpt match {
case None ⇒
- throw new AccountingException("No resource [%s]".format(resourceEvent.resource))
- case Some(dslResource) ⇒
-
- val costPolicy = dslResource.costPolicy
- val isDiscrete = costPolicy.isDiscrete
- val oldValueM = oldResourceEventM.map(_.value)
- val newValue = newResourceEvent.value
-
- /* This is a safeguard against the special case where the last
- * resource state update, as marked by the lastUpdate parameter
- * is equal to the time of the event occurrence. This means that
- * this is the first time the resource state has been recorded.
- * Charging in this case only makes sense for discrete resources.
- */
- if (lastSnapshotDate.getTime == resourceEvent.occurredMillis && !isDiscrete) {
- Just(List())
- } else {
- val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(oldValueM, newValue).forNoVal(Just(0.0))
- for {
- amount <- creditCalculationValueM
- } yield {
- // We don't do strict checking for all cases for OnOffPolicies as
- // above, since this point won't be reached in case of error.
- val isFinal = dslResource.costPolicy match {
- case OnOffCostPolicy =>
- OnOffPolicyResourceState(oldValueM) match {
- case OnResourceState => false
- case OffResourceState => true
- }
- case _ => true
- }
-
- val timeslot = dslResource.costPolicy match {
- case DiscreteCostPolicy => Timeslot(new Date(resourceEvent.occurredMillis),
- new Date(resourceEvent.occurredMillis + 1))
- case _ => Timeslot(lastSnapshotDate, new Date(resourceEvent.occurredMillis))
- }
-
- val chargeChunks = calcChangeChunks(dslAgreement, amount, dslResource, timeslot)
-
- val timeReceived = System.currentTimeMillis
-
- val rel = related.map{x => x.sourceEventIDs}.flatten ++ Traversable(resourceEvent.id)
-
- val entries = chargeChunks.map {
- chargedChunk ⇒
- WalletEntry(
- id = CryptoUtils.sha1(chargedChunk.id),
- occurredMillis = resourceEvent.occurredMillis,
- receivedMillis = timeReceived,
- sourceEventIDs = rel.toList,
- value = chargedChunk.cost,
- reason = chargedChunk.reason,
- userId = resourceEvent.userId,
- resource = resourceEvent.resource,
- instanceId = resourceEvent.instanceId,
- finalized = isFinal
- )
- } // entries
-
- entries
- } // yield
- } // else
+ val errMsg = "Unknown agreement %s during %s".format(agreementName, alignedTimeslot)
+ clog.error("%s", errMsg)
+ throw new AquariumException(errMsg)
+
+ case Some(agreement) ⇒
+ // TODO: Factor this out, just like we did with:
+ // TODO: val alignedTimeslots = splitTimeslotByPoliciesAndAgreements
+ // Note that most of the code is already taken from calcChangeChunks()
+ val r = resolveEffectiveAlgorithmsAndPriceLists(alignedTimeslot, agreement, Some(clog))
+ val algorithmByTimeslot: Map[Timeslot, DSLAlgorithm] = r._1
+ val pricelistByTimeslot: Map[Timeslot, DSLPriceList] = r._2
+
+ // Now, the timeslots must be the same
+ val finegrainedTimeslots = algorithmByTimeslot.keySet
+
+ val chargeslots = for {
+ finegrainedTimeslot <- finegrainedTimeslots
+ } yield {
+// val finegrainedTimeslotMsg = "finegrainedTimeslot = %s".format(finegrainedTimeslot)
+// clog.begin(finegrainedTimeslotMsg)
+
+ val dslAlgorithm = algorithmByTimeslot(finegrainedTimeslot) // TODO: is this correct?
+// clog.debug("dslAlgorithm = %s", dslAlgorithm)
+// clog.debugMap("dslAlgorithm.algorithms", dslAlgorithm.algorithms, 1)
+ val dslPricelist = pricelistByTimeslot(finegrainedTimeslot) // TODO: is this correct?
+// clog.debug("dslPricelist = %s", dslPricelist)
+// clog.debug("dslResource = %s", dslResource)
+ val algorithmDefOpt = dslAlgorithm.algorithms.get(dslResource)
+// clog.debug("algorithmDefOpt = %s", algorithmDefOpt)
+ val priceUnitOpt = dslPricelist.prices.get(dslResource)
+// clog.debug("priceUnitOpt = %s", priceUnitOpt)
+
+ val chargeslot = (algorithmDefOpt, priceUnitOpt) match {
+ case (None, None) ⇒
+ throw new AquariumException(
+ "Unknown algorithm and price unit for resource %s during %s".
+ format(dslResource, finegrainedTimeslot))
+ case (None, _) ⇒
+ throw new AquariumException(
+ "Unknown algorithm for resource %s during %s".
+ format(dslResource, finegrainedTimeslot))
+ case (_, None) ⇒
+ throw new AquariumException(
+ "Unknown price unit for resource %s during %s".
+ format(dslResource, finegrainedTimeslot))
+ case (Some(algorithmDefinition), Some(priceUnit)) ⇒
+ Chargeslot(finegrainedTimeslot.from.getTime, finegrainedTimeslot.to.getTime, algorithmDefinition, priceUnit)
+ }
+
+// clog.end(finegrainedTimeslotMsg)
+ chargeslot
+ }
+
+// clog.end(alignedTimeslotMsg)
+ chargeslots.toList
}
- }.flatten1
+ }
+// clog.end("ROUND 2")
+
+
+ val result = allChargeslots.flatten
+// clog.debugSeq("result", allChargeslots, 1)
+// clog.end()
+ result
+ }
+
+ /**
+ * Compute the charge slots generated by a particular resource event.
+ *
+ */
+ def computeFullChargeslots(previousResourceEventOpt: Option[ResourceEventModel],
+ currentResourceEvent: ResourceEventModel,
+ oldCredits: Double,
+ oldTotalAmount: Double,
+ newTotalAmount: Double,
+ dslResource: DSLResource,
+ defaultResourceMap: DSLResourcesMap,
+ agreementNamesByTimeslot: Map[Timeslot, String],
+ algorithmCompiler: CostPolicyAlgorithmCompiler,
+ policyStore: PolicyStore,
+ clogOpt: Option[ContextualLogger] = None): (Timeslot, List[Chargeslot]) = {
+
+ val clog = ContextualLogger.fromOther(clogOpt, logger, "computeFullChargeslots()")
+// clog.begin()
+
+ val occurredDate = currentResourceEvent.occurredDate
+ val occurredMillis = currentResourceEvent.occurredMillis
+ val costPolicy = dslResource.costPolicy
+
+ val dsl = new DSL{}
+ val (referenceTimeslot, relevantPolicies, previousValue) = costPolicy.needsPreviousEventForCreditAndAmountCalculation match {
+ // We need a previous event
+ case true ⇒
+ previousResourceEventOpt match {
+ // We have a previous event
+ case Some(previousResourceEvent) ⇒
+// clog.debug("Have previous event")
+// clog.debug("previousValue = %s", previousResourceEvent.value)
+
+ val referenceTimeslot = Timeslot(previousResourceEvent.occurredDate, occurredDate)
+// clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
+
+ // all policies within the interval from previous to current resource event
+// clog.debug("Calling policyStore.loadAndSortPoliciesWithin(%s)", referenceTimeslot)
+ val relevantPolicies = policyStore.loadAndSortPoliciesWithin(referenceTimeslot.from.getTime, referenceTimeslot.to.getTime, dsl)
+// clog.debugMap("==> relevantPolicies", relevantPolicies, 0)
+
+ (referenceTimeslot, relevantPolicies, previousResourceEvent.value)
+
+ // We do not have a previous event
+ case None ⇒
+ throw new AquariumException(
+ "Unable to charge. No previous event given for %s".
+ format(currentResourceEvent.toDebugString()))
+ }
+
+ // We do not need a previous event
+ case false ⇒
+ // ... so we cannot compute timedelta from a previous event, there is just one chargeslot
+ // referring to (almost) an instant in time
+// clog.debug("DO NOT have previous event")
+ val previousValue = costPolicy.getResourceInstanceUndefinedAmount
+// clog.debug("previousValue = costPolicy.getResourceInstanceUndefinedAmount = %s", previousValue)
+
+ val referenceTimeslot = Timeslot(new MutableDateCalc(occurredDate).goPreviousMilli.toDate, occurredDate)
+// clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
+
+// clog.debug("Calling policyStore.loadValidPolicyEntryAt(%s)", new MutableDateCalc(occurredMillis))
+ val relevantPolicyOpt = policyStore.loadValidPolicyAt(occurredMillis, dsl)
+// clog.debug(" ==> relevantPolicyM = %s", relevantPolicyM)
+
+ val relevantPolicies = relevantPolicyOpt match {
+ case Some(relevantPolicy) ⇒
+ Map(referenceTimeslot -> relevantPolicy)
+
+ case None ⇒
+ throw new AquariumInternalError("No relevant policy found for %s".format(referenceTimeslot))
+ }
+
+ (referenceTimeslot, relevantPolicies, previousValue)
+ }
+
+ val initialChargeslots = computeInitialChargeslots(
+ referenceTimeslot,
+ dslResource,
+ relevantPolicies,
+ agreementNamesByTimeslot,
+ Some(clog)
+ )
+
+ val fullChargeslots = initialChargeslots.map {
+ case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒
+ val execAlgorithm = algorithmCompiler.compile(algorithmDefinition)
+ val valueMap = costPolicy.makeValueMap(
+ oldCredits,
+ oldTotalAmount,
+ newTotalAmount,
+ stopMillis - startMillis,
+ previousValue,
+ currentResourceEvent.value,
+ unitPrice
+ )
+
+// clog.debug("execAlgorithm = %s", execAlgorithm)
+ clog.debugMap("valueMap", valueMap, 1)
+
+ // This is it
+ val credits = execAlgorithm.apply(valueMap)
+ chargeslot.copy(computedCredits = Some(credits))
+ }
+
+ val result = referenceTimeslot -> fullChargeslots
+
+ result
}
/**
* @param previousAmount The current state of the resource
* @param previousOccurred The last time the resource state was updated
*/
- def chargeEvent(currentResourceEvent: ResourceEvent,
+ def chargeEvent(currentResourceEvent: ResourceEventModel,
agreements: SortedMap[Timeslot, String],
previousAmount: Double,
previousOccurred: Date,
related: List[WalletEntry]): Maybe[List[WalletEntry]] = {
assert(previousOccurred.getTime <= currentResourceEvent.occurredMillis)
- val occurredDate = new Date(currentResourceEvent.occurredMillis)
+ val occuredDate = new Date(currentResourceEvent.occurredMillis)
/* The following makes sure that agreements exist between the start
* and end days of the processed event. As agreement updates are
*/
assert(
agreements.keysIterator.exists {
- p => p.includes(occurredDate)
+ p => p.includes(occuredDate)
} && agreements.keysIterator.exists {
p => p.includes(previousOccurred)
}
)
- val t = Timeslot(previousOccurred, occurredDate)
+ val t = Timeslot(previousOccurred, occuredDate)
// Align policy and agreement validity timeslots to the event's boundaries
val policyTimeslots = t.align(
- Policy.policies(previousOccurred, occurredDate).keysIterator.toList)
+ Policy.policies(previousOccurred, occuredDate).keysIterator.toList)
val agreementTimeslots = t.align(agreements.keysIterator.toList)
/*
Some(x)
) match {
case Just(x) => x
- case Failed(f, e) => return Failed(f,e)
+ case Failed(f) => return Failed(f)
case NoVal => List()
}
entries
* resource event boundaries
* @return A list of wallet entries, one for each
*/
- def chargeEvent(event: ResourceEvent,
+ def chargeEvent(event: ResourceEventModel,
agr: DSLAgreement,
previousAmount: Double,
previousOccurred: Date,
chargeFor.map{x => assert(true,
Timeslot(previousOccurred, new Date(event.occurredMillis)))}
- if (!event.validate())
- return Failed(new AccountingException("Event not valid"))
+// if (!event.validate())
+// return Failed(new AccountingException("Event not valid"))
val policy = Policy.policy
val dslResource = policy.findResource(event.resource) match {
val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(Just(previousAmount), event.value)
val amount = creditCalculationValueM match {
- case failed @ Failed(_, _) ⇒
+ case failed @ Failed(_) ⇒
return failed
case Just(amount) ⇒
amount
*/
val chargeChunks = calcChangeChunks(agr, amount, dslResource, timeslot)
- val timeReceived = System.currentTimeMillis
+ val timeReceived = TimeHelpers.nowMillis()
- val rel = related.map{x => x.sourceEventIDs}.flatten ++ List(event.id)
+ val rel = event.id :: related.map{x => x.sourceEventIDs}.flatten
- /*
- * Convert charge chunks to wallet entries.
- */
- val entries = chargeChunks.map {
- c =>
+ val entries = chargeChunks.map { c=>
WalletEntry(
id = CryptoUtils.sha1(c.id),
occurredMillis = event.occurredMillis,
sourceEventIDs = rel,
value = c.cost,
reason = c.reason,
- userId = event.userId,
+ userId = event.userID,
resource = event.resource,
- instanceId = event.instanceId,
+ instanceId = event.instanceID,
finalized = isFinal
)
}
*/
private[logic] def alignTimeslots(a: List[Timeslot],
b: List[Timeslot]): List[Timeslot] = {
- if (a.isEmpty) return b.tail
- if (b.isEmpty) return a.tail
+
+ def safeTail(foo: List[Timeslot]) = foo match {
+ case Nil => List()
+ case x :: Nil => List()
+ case x :: rest => rest
+ }
+
+ if (a.isEmpty) return b
+ if (b.isEmpty) return a
+
assert (a.head.from == b.head.from)
if (a.head.endsAfter(b.head)) {
- a.head.slice(b.head.to) ::: alignTimeslots(a.tail, b.tail)
+ val slice = a.head.slice(b.head.to)
+ slice.head :: alignTimeslots(slice.last :: a.tail, safeTail(b))
} else if (b.head.endsAfter(a.head)) {
- b.head.slice(a.head.to) ::: alignTimeslots(a.tail, b.tail)
+ val slice = b.head.slice(a.head.to)
+ slice.head :: alignTimeslots(safeTail(a), slice.last :: b.tail)
} else {
- a.head :: alignTimeslots(a.tail, b.tail)
+ a.head :: alignTimeslots(safeTail(a), safeTail(b))
}
}
}
def id(): String =
CryptoUtils.sha1("%f%s%f%s%s%d".format(value, algorithm, price, when.toString,
- resource.name, System.currentTimeMillis()))
+ resource.name, TimeHelpers.nowMillis()))
}
/** An exception raised when something goes wrong with accounting */
-class AccountingException(msg: String) extends Exception(msg)
+class AccountingException(msg: String) extends AquariumException(msg)