Preparing the move to master
[aquarium] / src / main / scala / gr / grnet / aquarium / logic / accounting / Accounting.scala
index 3c978c5..efdc67f 100644 (file)
@@ -40,9 +40,10 @@ import dsl._
 import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent}
 import collection.immutable.SortedMap
 import java.util.Date
-import gr.grnet.aquarium.util.{CryptoUtils, Loggable}
 import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just}
 import gr.grnet.aquarium.util.date.MutableDateCalc
+import gr.grnet.aquarium.util.{ContextualLogger, CryptoUtils, Loggable}
+import gr.grnet.aquarium.store.PolicyStore
 
 /**
  * A timeslot together with the algorithm and unit price that apply for this particular timeslot.
@@ -77,12 +78,22 @@ trait Accounting extends DSLUtils with Loggable {
   protected
   def splitTimeslotByPoliciesAndAgreements(referenceTimeslot: Timeslot,
                                            policyTimeslots: List[Timeslot],
-                                           agreementTimeslots: List[Timeslot]): List[Timeslot] = {
+                                           agreementTimeslots: List[Timeslot],
+                                           clogM: Maybe[ContextualLogger] = NoVal): List[Timeslot] = {
+
+    val clog = ContextualLogger.fromOther(clogM, logger, "splitTimeslotByPoliciesAndAgreements()")
+    clog.begin()
+
     // Align policy and agreement validity timeslots to the referenceTimeslot
     val alignedPolicyTimeslots    = referenceTimeslot.align(policyTimeslots)
     val alignedAgreementTimeslots = referenceTimeslot.align(agreementTimeslots)
 
-    alignTimeslots(alignedPolicyTimeslots, alignedAgreementTimeslots)
+    ContextualLogger.debugList(clog, "alignedPolicyTimeslots", alignedPolicyTimeslots)
+    ContextualLogger.debugList(clog, "alignedAgreementTimeslots", alignedAgreementTimeslots)
+
+    val result = alignTimeslots(alignedPolicyTimeslots, alignedAgreementTimeslots, Just(clog))
+    clog.end()
+    result
   }
 
   /**
@@ -98,11 +109,24 @@ trait Accounting extends DSLUtils with Loggable {
   def computeInitialChargeslots(referenceTimeslot: Timeslot,
                                 dslResource: DSLResource,
                                 policiesByTimeslot: Map[Timeslot, DSLPolicy],
-                                agreementNamesByTimeslot: Map[Timeslot, String]): Maybe[List[Chargeslot]] = Maybe {
+                                agreementNamesByTimeslot: Map[Timeslot, String],
+                                contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[List[Chargeslot]] = Maybe {
+
+    val clog = ContextualLogger.fromOther(contextualLogger, logger, "computeInitialChargeslots()")
+    clog.begin()
 
     val policyTimeslots = policiesByTimeslot.keySet
     val agreementTimeslots = agreementNamesByTimeslot.keySet
 
+    clog.debug("policiesByTimeslot:")
+    clog.withIndent {
+      policyTimeslots.foreach(pt ⇒ clog.debug("%s: %s", pt,  policiesByTimeslot(pt)))
+    }
+    clog.debug("agreementNamesByTimeslot:")
+    clog.withIndent {
+      agreementTimeslots.foreach(at ⇒ clog.debug("%s: %s", at, agreementNamesByTimeslot(at)))
+    }
+
     def getPolicy(ts: Timeslot): DSLPolicy = {
       policiesByTimeslot.find(_._1.contains(ts)).get._2
     }
@@ -111,11 +135,17 @@ trait Accounting extends DSLUtils with Loggable {
     }
 
     // 1. Round ONE: split time according to overlapping policies and agreements.
-    val alignedTimeslots = splitTimeslotByPoliciesAndAgreements(referenceTimeslot, policyTimeslots.toList, agreementTimeslots.toList)
+    val alignedTimeslots = splitTimeslotByPoliciesAndAgreements(referenceTimeslot, policyTimeslots.toList, agreementTimeslots.toList, Just(clog))
+    clog.debug("ROUND 1: alignedTimeslots:")
+    clog.withIndent {
+      alignedTimeslots.foreach(ts ⇒ clog.debug("%s", ts))
+    }
 
     // 2. Round TWO: Use the aligned timeslots of Round ONE to produce even more
     //    fine-grained timeslots according to applicable algorithms.
     //    Then pack the info into charge slots.
+    clog.debug("ROUND 2")
+    clog.indent()
     val allChargeslots = for {
       alignedTimeslot <- alignedTimeslots
     } yield {
@@ -125,7 +155,9 @@ trait Accounting extends DSLUtils with Loggable {
 
       agreementOpt match {
         case None ⇒
-          throw new Exception("Unknown agreement %s during %s".format(agreementName, alignedTimeslot))
+          val errMsg = "Unknown agreement %s during %s".format(agreementName, alignedTimeslot)
+          clog.error("%s", errMsg)
+          throw new Exception(errMsg)
 
         case Some(agreement) ⇒
           // TODO: Factor this out, just like we did with:
@@ -147,6 +179,15 @@ trait Accounting extends DSLUtils with Loggable {
             val dslPricelist = pricelistByTimeslot(finegrainedTimeslot) // TODO: is this correct?
             val algorithmDefOpt = dslAlgorithm.algorithms.get(dslResource)
             val priceUnitOpt = dslPricelist.prices.get(dslResource)
+
+            clog.debug("%s:", finegrainedTimeslot)
+            clog.withIndent {
+              clog.debug("dslAlgorithm = %s", dslAlgorithm)
+              clog.debug("dslPricelist = %s", dslPricelist)
+              clog.debug("algorithmDefOpt = %s", algorithmDefOpt)
+              clog.debug("priceUnitOpt = %s", priceUnitOpt)
+            }
+
             (algorithmDefOpt, priceUnitOpt) match {
               case (None, None) ⇒
                 throw new Exception(
@@ -168,6 +209,9 @@ trait Accounting extends DSLUtils with Loggable {
           chargeslots.toList
       }
     }
+    clog.unindent() // ROUND 2
+
+    clog.end()
 
     allChargeslots.flatten
   }
@@ -184,7 +228,12 @@ trait Accounting extends DSLUtils with Loggable {
                              dslResource: DSLResource,
                              defaultResourceMap: DSLResourcesMap,
                              agreementNamesByTimeslot: Map[Timeslot, String],
-                             algorithmCompiler: CostPolicyAlgorithmCompiler): Maybe[List[Chargeslot]] = Maybe {
+                             algorithmCompiler: CostPolicyAlgorithmCompiler,
+                             policyStore: PolicyStore,
+                             contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[List[Chargeslot]] = Maybe {
+
+    val clog = ContextualLogger.fromOther(contextualLogger, logger, "computeFullChargeslots()")
+    clog.begin()
 
     val occurredDate = currentResourceEvent.occurredDate
     val costPolicy = dslResource.costPolicy
@@ -195,9 +244,12 @@ trait Accounting extends DSLUtils with Loggable {
         previousResourceEventM match {
           // We have a previous event
           case Just(previousResourceEvent) ⇒
+            clog.debug("Have previous event")
             val referenceTimeslot = Timeslot(previousResourceEvent.occurredDate, occurredDate)
+
             // all policies within the interval from previous to current resource event
-            val relevantPolicies = Policy.policies(referenceTimeslot)
+            clog.debug("Calling Policy.policies(%s)", referenceTimeslot)
+            val relevantPolicies = policyStore.loadAndSortPoliciesWithin(referenceTimeslot.from.getTime, referenceTimeslot.to.getTime, new DSL{})
 
             (referenceTimeslot, relevantPolicies, previousResourceEvent.value)
 
@@ -218,18 +270,30 @@ trait Accounting extends DSLUtils with Loggable {
       case false ⇒
         // ... so we cannot compute timedelta from a previous event, there is just one chargeslot
         // referring to (almost) an instant in time
+        clog.debug("DO NOT have previous event")
         val referenceTimeslot = Timeslot(new MutableDateCalc(occurredDate).goPreviousMilli.toDate, occurredDate)
         val relevantPolicy = Policy.policy(occurredDate)
+        clog.debug("Calling Policy.policy(%s)", new MutableDateCalc(occurredDate))
         val relevantPolicies = Map(referenceTimeslot -> relevantPolicy)
 
         (referenceTimeslot, relevantPolicies, costPolicy.getResourceInstanceUndefinedAmount)
     }
+    clog.debug("previousValue = %s".format(previousValue))
+    clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
+    clog.debug("relevantPolicies:")
+    clog.withIndent {
+      val timeslots = relevantPolicies.keysIterator
+      for(ts <- timeslots) {
+        clog.debug("%s: %s", ts, relevantPolicies(ts))
+      }
+    }
 
     val initialChargeslotsM = computeInitialChargeslots(
       referenceTimeslot,
       dslResource,
       relevantPolicies,
-      agreementNamesByTimeslot
+      agreementNamesByTimeslot,
+      Just(clog)
     )
     
     val fullChargeslotsM = initialChargeslotsM.map { chargeslots ⇒
@@ -274,7 +338,7 @@ trait Accounting extends DSLUtils with Loggable {
       }
     }
 
-    fullChargeslotsM match {
+    val result = fullChargeslotsM match {
       case Just(fullChargeslots) ⇒
         fullChargeslots
       case NoVal ⇒
@@ -282,10 +346,14 @@ trait Accounting extends DSLUtils with Loggable {
       case failed @ Failed(e, m) ⇒
         throw new Exception(m, e)
     }
+
+    clog.end()
+
+    result
   }
 
   /**
-   * Creates a list of wallet entries by examining the on the resource state.
+   * Create a list of wallet entries by charging for a resource event.
    *
    * @param currentResourceEvent The resource event to create charges for
    * @param agreements The user's agreement names, indexed by their
@@ -331,7 +399,7 @@ trait Accounting extends DSLUtils with Loggable {
 
     val walletEntries = aligned.map {
       x =>
-        // Retrieve agreement from policy valid at time of event
+        // Retrieve agreement from the policy valid at time of event
         val agreementName = agreements.find(y => y._1.contains(x)) match {
           case Some(x) => x
           case None => return Failed(new AccountingException(("Cannot find" +
@@ -342,11 +410,12 @@ trait Accounting extends DSLUtils with Loggable {
         val entries = chargeEvent(
           currentResourceEvent,
           Policy.policy(x.from).findAgreement(agreementName._2).getOrElse(
-            return Failed(new AccountingException("Cannot get agreement for "))
+            return Failed(new AccountingException("Cannot get agreement for %s".format()))
           ),
           previousAmount,
           previousOccurred,
-          related
+          related,
+          Some(x)
         ) match {
           case Just(x) => x
           case Failed(f, e) => return Failed(f,e)
@@ -362,26 +431,36 @@ trait Accounting extends DSLUtils with Loggable {
    * Creates a list of wallet entries by applying the agreement provisions on
    * the resource state.
    *
-   * @param currentResourceEvent The resource event to create charges for
+   * @param event The resource event to create charges for
    * @param agr The agreement implementation to use
    * @param previousAmount The current state of the resource
-   * @param previousOccurred
-   * @param related
-   * @return
+   * @param previousOccurred The timestamp of the previous event
+   * @param related Related wallet entries (TODO: should remove)
+   * @param chargeFor The duration for which the charge should be done.
+   *                  Should fall between the previous and current
+   *                  resource event boundaries
+   * @return A list of wallet entries, one for each
    */
-  def chargeEvent(currentResourceEvent: ResourceEvent,
+  def chargeEvent(event: ResourceEvent,
                   agr: DSLAgreement,
                   previousAmount: Double,
                   previousOccurred: Date,
-                  related: List[WalletEntry]): Maybe[List[WalletEntry]] = {
+                  related: List[WalletEntry],
+                  chargeFor: Option[Timeslot]): Maybe[List[WalletEntry]] = {
 
-    if (!currentResourceEvent.validate())
+    // If chargeFor is not null, make sure it falls within
+    // event time boundaries
+    chargeFor.map{x => assert(true,
+      Timeslot(previousOccurred, new Date(event.occurredMillis)))}
+
+    if (!event.validate())
       return Failed(new AccountingException("Event not valid"))
 
     val policy = Policy.policy
-    val dslResource = policy.findResource(currentResourceEvent.resource) match {
+    val dslResource = policy.findResource(event.resource) match {
       case Some(x) => x
-      case None => return Failed(new AccountingException("No resource [%s]".format(currentResourceEvent.resource)))
+      case None => return Failed(
+        new AccountingException("No resource [%s]".format(event.resource)))
     }
 
     /* This is a safeguard against the special case where the last
@@ -390,14 +469,14 @@ trait Accounting extends DSLUtils with Loggable {
      * this is the first time the resource state has been recorded.
      * Charging in this case only makes sense for discrete resources.
      */
-    if (previousOccurred.getTime == currentResourceEvent.occurredMillis) {
+    if (previousOccurred.getTime == event.occurredMillis) {
       dslResource.costPolicy match {
         case DiscreteCostPolicy => //Ok
         case _ => return Some(List())
       }
     }
 
-    val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(Just(previousAmount), currentResourceEvent.value)
+    val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(Just(previousAmount), event.value)
     val amount = creditCalculationValueM match {
       case failed @ Failed(_, _) ⇒
         return failed
@@ -418,35 +497,54 @@ trait Accounting extends DSLUtils with Loggable {
       case _ => true
     }
 
+    /*
+     * Get the timeslot for which this event will be charged. In case we
+     * have a discrete resource, we do not really care for the time duration
+     * of an event. To process all events in a uniform way, we create an
+     * artificial timeslot lasting the minimum amount of time. In all other
+     * cases, we first check whether a desired charge period passed as
+     * an argument.
+     */
     val timeslot = dslResource.costPolicy match {
-      case DiscreteCostPolicy => Timeslot(new Date(currentResourceEvent.occurredMillis),
-        new Date(currentResourceEvent.occurredMillis + 1))
-      case _ => Timeslot(previousOccurred, new Date(currentResourceEvent.occurredMillis))
+      case DiscreteCostPolicy => Timeslot(new Date(event.occurredMillis - 1),
+        new Date(event.occurredMillis))
+      case _ => chargeFor match {
+        case Some(x) => x
+        case None => Timeslot(previousOccurred, new Date(event.occurredMillis))
+      }
     }
 
+    /*
+     * The following splits the chargable timeslot into smaller timeslots to
+     * comply with different applicability periods for algorithms and
+     * pricelists defined by the provided agreement.
+     */
     val chargeChunks = calcChangeChunks(agr, amount, dslResource, timeslot)
 
     val timeReceived = System.currentTimeMillis
 
-    val rel = related.map{x => x.sourceEventIDs}.flatten ++ List(currentResourceEvent.id)
+    val rel = event.id :: related.map{x => x.sourceEventIDs}.flatten
 
     val entries = chargeChunks.map { c=>
         WalletEntry(
           id = CryptoUtils.sha1(c.id),
-          occurredMillis = currentResourceEvent.occurredMillis,
+          occurredMillis = event.occurredMillis,
           receivedMillis = timeReceived,
           sourceEventIDs = rel,
           value = c.cost,
           reason = c.reason,
-          userId = currentResourceEvent.userId,
-          resource = currentResourceEvent.resource,
-          instanceId = currentResourceEvent.instanceId,
+          userId = event.userId,
+          resource = event.resource,
+          instanceId = event.instanceId,
           finalized = isFinal
         )
     }
     Just(entries)
   }
 
+  /**
+   * Create a
+   */
   def calcChangeChunks(agr: DSLAgreement, volume: Double,
                        res: DSLResource, t: Timeslot): List[ChargeChunk] = {
 
@@ -464,10 +562,14 @@ trait Accounting extends DSLUtils with Loggable {
     }
   }
 
+  /**
+   * Get a list of charge chunks for discrete resources.
+   */
   private[logic]
   def calcChargeChunksDiscrete(algChunked: Map[Timeslot, DSLAlgorithm],
                                priChunked: Map[Timeslot, DSLPriceList],
                                volume: Double, res: DSLResource): List[ChargeChunk] = {
+    // In case of descrete resources, we only a expect a
     assert(algChunked.size == 1)
     assert(priChunked.size == 1)
     assert(algChunked.keySet.head.compare(priChunked.keySet.head) == 0)
@@ -478,6 +580,9 @@ trait Accounting extends DSLUtils with Loggable {
       algChunked.keySet.head, res))
   }
 
+  /**
+   * Get a list of charge chunks for continuous resources.
+   */
   private[logic]
   def calcChargeChunksContinuous(algChunked: Map[Timeslot, DSLAlgorithm],
                                  priChunked: Map[Timeslot, DSLPriceList],
@@ -541,18 +646,32 @@ trait Accounting extends DSLUtils with Loggable {
    * result: List(Timeslot(a.from, b.to), Timeslot(b.to, a.to))
    */
   private[logic] def alignTimeslots(a: List[Timeslot],
-                                    b: List[Timeslot]): List[Timeslot] = {
+                                    b: List[Timeslot],
+                                    clogM: Maybe[ContextualLogger] = NoVal): List[Timeslot] = {
+    val clog = ContextualLogger.fromOther(clogM, logger, "alignTimeslots()")
+    clog.begin()
+
+    ContextualLogger.debugList(clog, "a", a)
+    ContextualLogger.debugList(clog, "b", b)
+
     if (a.isEmpty) return b.tail
     if (b.isEmpty) return a.tail
     assert (a.head.from == b.head.from)
 
-    if (a.head.endsAfter(b.head)) {
-      a.head.slice(b.head.to) ::: alignTimeslots(a.tail, b.tail)
+    val clogJ = Just(clog)
+    val result = if (a.head.endsAfter(b.head)) {
+      clog.debug("Branch: a.head.endsAfter(b.head)")
+      a.head.slice(b.head.to) ::: alignTimeslots(a.tail, b.tail, clogJ)
     } else if (b.head.endsAfter(a.head)) {
-      b.head.slice(a.head.to) ::: alignTimeslots(a.tail, b.tail)
+      clog.debug("Branch: b.head.endsAfter(a.head)")
+      b.head.slice(a.head.to) ::: alignTimeslots(a.tail, b.tail, clogJ)
     } else {
-      a.head :: alignTimeslots(a.tail, b.tail)
+      clog.debug("Branch: !a.head.endsAfter(b.head) && !b.head.endsAfter(a.head)")
+      a.head :: alignTimeslots(a.tail, b.tail, clogJ)
     }
+
+    clog.end()
+    result
   }
 }