Preparing the move to master
[aquarium] / src / main / scala / gr / grnet / aquarium / logic / accounting / Accounting.scala
index f1662e3..efdc67f 100644 (file)
 
 package gr.grnet.aquarium.logic.accounting
 
+import algorithm.CostPolicyAlgorithmCompiler
 import dsl._
 import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent}
 import collection.immutable.SortedMap
 import java.util.Date
-import gr.grnet.aquarium.util.{CryptoUtils, Loggable}
 import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just}
+import gr.grnet.aquarium.util.date.MutableDateCalc
+import gr.grnet.aquarium.util.{ContextualLogger, CryptoUtils, Loggable}
 import gr.grnet.aquarium.store.PolicyStore
 
 /**
+ * A timeslot together with the algorithm and unit price that apply for this particular timeslot.
+ *
+ * @param startMillis
+ * @param stopMillis
+ * @param algorithmDefinition
+ * @param unitPrice
+ * @param computedCredits The computed credits
+ */
+case class Chargeslot(startMillis: Long,
+                      stopMillis: Long,
+                      algorithmDefinition: String,
+                      unitPrice: Double,
+                      computedCredits: Option[Double] = None)
+
+/**
  * Methods for converting accounting events to wallet entries.
  *
  * @author Georgios Gousios <gousiosg@gmail.com>
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 trait Accounting extends DSLUtils with Loggable {
+  /**
+   * Breaks a reference timeslot (e.g. billing period) according to policies and agreements.
+   *
+   * @param referenceTimeslot
+   * @param policyTimeslots
+   * @param agreementTimeslots
+   * @return
+   */
+  protected
+  def splitTimeslotByPoliciesAndAgreements(referenceTimeslot: Timeslot,
+                                           policyTimeslots: List[Timeslot],
+                                           agreementTimeslots: List[Timeslot],
+                                           clogM: Maybe[ContextualLogger] = NoVal): List[Timeslot] = {
 
-  def computeWalletEntries(userId: String,
-                           costPolicy: DSLCostPolicy,
-                           previousResourceEventM: Maybe[ResourceEvent],
-                           previousAccumulatingAmount: Double,
-                           currentResourceEvent: ResourceEvent,
-                           resourcesMap: DSLResourcesMap,
-                           policyStore: PolicyStore): Maybe[Traversable[WalletEntry]] = Maybe {
-    val resource   = currentResourceEvent.resource
-    val instanceId = currentResourceEvent.instanceId
-    val currentValue = currentResourceEvent.value
-
-    // Validations
-    // 1. Validate cost policy
-    val actualCostPolicyM = currentResourceEvent.findCostPolicyM(resourcesMap)
-    val currentResourceEventDebugStr = currentResourceEvent.toDebugString(resourcesMap, false)
-    actualCostPolicyM match {
-      case Just(actualCostPolicy) ⇒
-        if(costPolicy != actualCostPolicy) {
-          throw new Exception("Actual cost policy %s is not the same as provided %s for event %s".
-            format(actualCostPolicy, costPolicy, currentResourceEventDebugStr))
-        }
-      case _ ⇒
-        throw new Exception("Could not verify cost policy %s for event %s".
-          format(costPolicy, currentResourceEventDebugStr))
+    val clog = ContextualLogger.fromOther(clogM, logger, "splitTimeslotByPoliciesAndAgreements()")
+    clog.begin()
+
+    // Align policy and agreement validity timeslots to the referenceTimeslot
+    val alignedPolicyTimeslots    = referenceTimeslot.align(policyTimeslots)
+    val alignedAgreementTimeslots = referenceTimeslot.align(agreementTimeslots)
+
+    ContextualLogger.debugList(clog, "alignedPolicyTimeslots", alignedPolicyTimeslots)
+    ContextualLogger.debugList(clog, "alignedAgreementTimeslots", alignedAgreementTimeslots)
+
+    val result = alignTimeslots(alignedPolicyTimeslots, alignedAgreementTimeslots, Just(clog))
+    clog.end()
+    result
+  }
+
+  /**
+   * Given a reference timeslot, we have to break it up to a series of timeslots where a particular
+   * algorithm and price unit is in effect.
+   *
+   * @param referenceTimeslot
+   * @param policiesByTimeslot
+   * @param agreementNamesByTimeslot
+   * @return
+   */
+  protected
+  def computeInitialChargeslots(referenceTimeslot: Timeslot,
+                                dslResource: DSLResource,
+                                policiesByTimeslot: Map[Timeslot, DSLPolicy],
+                                agreementNamesByTimeslot: Map[Timeslot, String],
+                                contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[List[Chargeslot]] = Maybe {
+
+    val clog = ContextualLogger.fromOther(contextualLogger, logger, "computeInitialChargeslots()")
+    clog.begin()
+
+    val policyTimeslots = policiesByTimeslot.keySet
+    val agreementTimeslots = agreementNamesByTimeslot.keySet
+
+    clog.debug("policiesByTimeslot:")
+    clog.withIndent {
+      policyTimeslots.foreach(pt ⇒ clog.debug("%s: %s", pt,  policiesByTimeslot(pt)))
     }
-    // 2. Validate previous resource event
-    previousResourceEventM match {
-      case Just(previousResourceEvent) ⇒
-        if(!costPolicy.needsPreviousEventForCreditAndAmountCalculation) {
-          throw new Exception("Provided previous event but cost policy %s does not need one".format(costPolicy))
-        }
-        // 3. resource and instanceId
-        val previousResource = previousResourceEvent.resource
-        val previousInstanceId = previousResourceEvent.instanceId
-        (resource == previousResource, instanceId == previousInstanceId) match {
-          case (true, true)  ⇒
-
-          case (true, false) ⇒
-            throw new Exception("Resource instance IDs do not match (%s vs %s) for current and previous event".
-              format(instanceId, previousInstanceId))
-
-          case (false, true) ⇒
-            throw new Exception("Resource types do not match (%s vs %s) for current and previous event".
-              format(resource, previousResource))
-
-          case (false, false) ⇒
-            throw new Exception("Resource types and instance IDs do not match (%s vs %s) for current and previous event".
-              format((resource, instanceId), (previousResource, previousInstanceId)))
+    clog.debug("agreementNamesByTimeslot:")
+    clog.withIndent {
+      agreementTimeslots.foreach(at ⇒ clog.debug("%s: %s", at, agreementNamesByTimeslot(at)))
+    }
+
+    def getPolicy(ts: Timeslot): DSLPolicy = {
+      policiesByTimeslot.find(_._1.contains(ts)).get._2
+    }
+    def getAgreementName(ts: Timeslot): String = {
+      agreementNamesByTimeslot.find(_._1.contains(ts)).get._2
+    }
+
+    // 1. Round ONE: split time according to overlapping policies and agreements.
+    val alignedTimeslots = splitTimeslotByPoliciesAndAgreements(referenceTimeslot, policyTimeslots.toList, agreementTimeslots.toList, Just(clog))
+    clog.debug("ROUND 1: alignedTimeslots:")
+    clog.withIndent {
+      alignedTimeslots.foreach(ts ⇒ clog.debug("%s", ts))
+    }
+
+    // 2. Round TWO: Use the aligned timeslots of Round ONE to produce even more
+    //    fine-grained timeslots according to applicable algorithms.
+    //    Then pack the info into charge slots.
+    clog.debug("ROUND 2")
+    clog.indent()
+    val allChargeslots = for {
+      alignedTimeslot <- alignedTimeslots
+    } yield {
+      val dslPolicy = getPolicy(alignedTimeslot)
+      val agreementName = getAgreementName(alignedTimeslot)
+      val agreementOpt = dslPolicy.findAgreement(agreementName)
+
+      agreementOpt match {
+        case None ⇒
+          val errMsg = "Unknown agreement %s during %s".format(agreementName, alignedTimeslot)
+          clog.error("%s", errMsg)
+          throw new Exception(errMsg)
+
+        case Some(agreement) ⇒
+          // TODO: Factor this out, just like we did with:
+          // TODO:  val alignedTimeslots = splitTimeslotByPoliciesAndAgreements
+          // TODO: Note that most of the code is already taken from calcChangeChunks()
+          val alg = resolveEffectiveAlgorithmsForTimeslot(alignedTimeslot, agreement)
+          val pri = resolveEffectivePricelistsForTimeslot(alignedTimeslot, agreement)
+          val chargeChunks = splitChargeChunks(alg, pri)
+          val algorithmByTimeslot = chargeChunks._1
+          val pricelistByTimeslot = chargeChunks._2
+
+          // Now, the timeslots must be the same
+          val finegrainedTimeslots = algorithmByTimeslot.keySet
+
+          val chargeslots = for {
+            finegrainedTimeslot <- finegrainedTimeslots
+          } yield {
+            val dslAlgorithm = algorithmByTimeslot(finegrainedTimeslot) // TODO: is this correct?
+            val dslPricelist = pricelistByTimeslot(finegrainedTimeslot) // TODO: is this correct?
+            val algorithmDefOpt = dslAlgorithm.algorithms.get(dslResource)
+            val priceUnitOpt = dslPricelist.prices.get(dslResource)
+
+            clog.debug("%s:", finegrainedTimeslot)
+            clog.withIndent {
+              clog.debug("dslAlgorithm = %s", dslAlgorithm)
+              clog.debug("dslPricelist = %s", dslPricelist)
+              clog.debug("algorithmDefOpt = %s", algorithmDefOpt)
+              clog.debug("priceUnitOpt = %s", priceUnitOpt)
+            }
+
+            (algorithmDefOpt, priceUnitOpt) match {
+              case (None, None) ⇒
+                throw new Exception(
+                  "Unknown algorithm and price unit for resource %s during %s".
+                    format(dslResource.name, finegrainedTimeslot))
+              case (None, _) ⇒
+                throw new Exception(
+                  "Unknown algorithm for resource %s during %s".
+                    format(dslResource.name, finegrainedTimeslot))
+              case (_, None) ⇒
+                throw new Exception(
+                  "Unknown price unit for resource %s during %s".
+                    format(dslResource.name, finegrainedTimeslot))
+              case (Some(algorithmDefinition), Some(priceUnit)) ⇒
+                Chargeslot(finegrainedTimeslot.from.getTime, finegrainedTimeslot.to.getTime, algorithmDefinition, priceUnit)
+            }
+          }
+
+          chargeslots.toList
+      }
+    }
+    clog.unindent() // ROUND 2
+
+    clog.end()
+
+    allChargeslots.flatten
+  }
+
+  /**
+   * Compute the charge slots generated by a particular resource event.
+   *
+   */
+  def computeFullChargeslots(previousResourceEventM: Maybe[ResourceEvent],
+                             currentResourceEvent: ResourceEvent,
+                             oldCredits: Double,
+                             oldTotalAmount: Double,
+                             newTotalAmount: Double,
+                             dslResource: DSLResource,
+                             defaultResourceMap: DSLResourcesMap,
+                             agreementNamesByTimeslot: Map[Timeslot, String],
+                             algorithmCompiler: CostPolicyAlgorithmCompiler,
+                             policyStore: PolicyStore,
+                             contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[List[Chargeslot]] = Maybe {
+
+    val clog = ContextualLogger.fromOther(contextualLogger, logger, "computeFullChargeslots()")
+    clog.begin()
+
+    val occurredDate = currentResourceEvent.occurredDate
+    val costPolicy = dslResource.costPolicy
+    
+    val (referenceTimeslot, relevantPolicies, previousValue) = costPolicy.needsPreviousEventForCreditAndAmountCalculation match {
+      // We need a previous event
+      case true ⇒
+        previousResourceEventM match {
+          // We have a previous event
+          case Just(previousResourceEvent) ⇒
+            clog.debug("Have previous event")
+            val referenceTimeslot = Timeslot(previousResourceEvent.occurredDate, occurredDate)
+
+            // all policies within the interval from previous to current resource event
+            clog.debug("Calling Policy.policies(%s)", referenceTimeslot)
+            val relevantPolicies = policyStore.loadAndSortPoliciesWithin(referenceTimeslot.from.getTime, referenceTimeslot.to.getTime, new DSL{})
+
+            (referenceTimeslot, relevantPolicies, previousResourceEvent.value)
+
+          // We do not have a previous event
+          case NoVal ⇒
+            throw new Exception(
+              "Unable to charge. No previous event given for %s".
+                format(currentResourceEvent.toDebugString(defaultResourceMap)))
+
+          // We could not obtain a previous event
+          case failed @ Failed(e, m) ⇒
+            throw new Exception(
+              "Unable to charge. Could not obtain previous event for %s".
+                format(currentResourceEvent.toDebugString(defaultResourceMap)))
         }
+        
+      // We do not need a previous event
+      case false ⇒
+        // ... so we cannot compute timedelta from a previous event, there is just one chargeslot
+        // referring to (almost) an instant in time
+        clog.debug("DO NOT have previous event")
+        val referenceTimeslot = Timeslot(new MutableDateCalc(occurredDate).goPreviousMilli.toDate, occurredDate)
+        val relevantPolicy = Policy.policy(occurredDate)
+        clog.debug("Calling Policy.policy(%s)", new MutableDateCalc(occurredDate))
+        val relevantPolicies = Map(referenceTimeslot -> relevantPolicy)
+
+        (referenceTimeslot, relevantPolicies, costPolicy.getResourceInstanceUndefinedAmount)
+    }
+    clog.debug("previousValue = %s".format(previousValue))
+    clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
+    clog.debug("relevantPolicies:")
+    clog.withIndent {
+      val timeslots = relevantPolicies.keysIterator
+      for(ts <- timeslots) {
+        clog.debug("%s: %s", ts, relevantPolicies(ts))
+      }
+    }
+
+    val initialChargeslotsM = computeInitialChargeslots(
+      referenceTimeslot,
+      dslResource,
+      relevantPolicies,
+      agreementNamesByTimeslot,
+      Just(clog)
+    )
+    
+    val fullChargeslotsM = initialChargeslotsM.map { chargeslots ⇒
+      chargeslots.map {
+        case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒
+          val execAlgorithmM = algorithmCompiler.compile(algorithmDefinition)
+          execAlgorithmM match {
+            case NoVal ⇒
+              throw new Exception("Could not compile algorithm %s".format(algorithmDefinition))
+
+            case failed @ Failed(e, m) ⇒
+              throw new Exception(m, e)
+
+            case Just(execAlgorithm) ⇒
+              val valueMap = costPolicy.makeValueMap(
+                costPolicy.name,
+                oldCredits,
+                oldTotalAmount,
+                newTotalAmount,
+                stopMillis - startMillis,
+                previousValue,
+                currentResourceEvent.value,
+                unitPrice
+              )
+
+              // This is it
+              val creditsM = execAlgorithm.apply(valueMap)
+
+              creditsM match {
+                case NoVal ⇒
+                  throw new Exception(
+                    "Could not compute credits for resource %s during %s".
+                      format(dslResource.name, Timeslot(new Date(startMillis), new Date(stopMillis))))
+
+                case failed @ Failed(e, m) ⇒
+                  throw new Exception(m, e)
+
+                case Just(credits) ⇒
+                  chargeslot.copy(computedCredits = Some(credits))
+              }
+          }
+      }
+    }
+
+    val result = fullChargeslotsM match {
+      case Just(fullChargeslots) ⇒
+        fullChargeslots
       case NoVal ⇒
-        if(costPolicy.needsPreviousEventForCreditAndAmountCalculation) {
-          throw new Exception("Did not provid previous event but cost policy %s needa one".format(costPolicy))
-        }
+        null
       case failed @ Failed(e, m) ⇒
-        throw new Exception("Error obtaining previous event".format(m), e)
+        throw new Exception(m, e)
     }
 
-    //
-    Nil
+    clog.end()
+
+    result
   }
 
-  def chargeEvent2( oldResourceEventM: Maybe[ResourceEvent],
-                   newResourceEvent: ResourceEvent,
-                   dslAgreement: DSLAgreement,
-                   lastSnapshotDate: Date,
-                   related: Traversable[WalletEntry]): Maybe[Traversable[WalletEntry]] = {
-    Maybe {
-      val dslPolicy: DSLPolicy = Policy.policy // TODO: query based on time
-      val resourceEvent = newResourceEvent
-      dslPolicy.findResource(resourceEvent.resource) match {
-        case None ⇒
-          throw new AccountingException("No resource [%s]".format(resourceEvent.resource))
-        case Some(dslResource) ⇒
-
-          val costPolicy = dslResource.costPolicy
-          val isDiscrete = costPolicy.isDiscrete
-          val oldValueM = oldResourceEventM.map(_.value)
-          val newValue = newResourceEvent.value
-
-          /* This is a safeguard against the special case where the last
-          * resource state update, as marked by the lastUpdate parameter
-          * is equal to the time of the event occurrence. This means that
-          * this is the first time the resource state has been recorded.
-          * Charging in this case only makes sense for discrete resources.
-          */
-          if (lastSnapshotDate.getTime == resourceEvent.occurredMillis && !isDiscrete) {
-            Just(List())
-          } else {
-            val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(oldValueM, newValue).forNoVal(Just(0.0))
-            for {
-              amount <- creditCalculationValueM
-            } yield {
-              // We don't do strict checking for all cases for OnOffPolicies as
-              // above, since this point won't be reached in case of error.
-              val isFinal = dslResource.costPolicy match {
-                case OnOffCostPolicy =>
-                  OnOffPolicyResourceState(oldValueM) match {
-                    case OnResourceState => false
-                    case OffResourceState => true
-                  }
-                case _ => true
-              }
+  /**
+   * Create a list of wallet entries by charging for a resource event.
+   *
+   * @param currentResourceEvent The resource event to create charges for
+   * @param agreements The user's agreement names, indexed by their
+   *                   applicability timeslot
+   * @param previousAmount The current state of the resource
+   * @param previousOccurred The last time the resource state was updated
+   */
+  def chargeEvent(currentResourceEvent: ResourceEvent,
+                  agreements: SortedMap[Timeslot, String],
+                  previousAmount: Double,
+                  previousOccurred: Date,
+                  related: List[WalletEntry]): Maybe[List[WalletEntry]] = {
 
-              val timeslot = dslResource.costPolicy match {
-                case DiscreteCostPolicy => Timeslot(new Date(resourceEvent.occurredMillis),
-                  new Date(resourceEvent.occurredMillis + 1))
-                case _ => Timeslot(lastSnapshotDate, new Date(resourceEvent.occurredMillis))
-              }
+    assert(previousOccurred.getTime <= currentResourceEvent.occurredMillis)
+    val occuredDate = new Date(currentResourceEvent.occurredMillis)
 
-              val chargeChunks = calcChangeChunks(dslAgreement, amount, dslResource, timeslot)
-
-              val timeReceived = System.currentTimeMillis
-
-              val rel = related.map{x => x.sourceEventIDs}.flatten ++ Traversable(resourceEvent.id)
-
-              val entries = chargeChunks.map {
-                chargedChunk ⇒
-                  WalletEntry(
-                    id = CryptoUtils.sha1(chargedChunk.id),
-                    occurredMillis = resourceEvent.occurredMillis,
-                    receivedMillis = timeReceived,
-                    sourceEventIDs = rel.toList,
-                    value = chargedChunk.cost,
-                    reason = chargedChunk.reason,
-                    userId = resourceEvent.userId,
-                    resource = resourceEvent.resource,
-                    instanceId = resourceEvent.instanceId,
-                    finalized = isFinal
-                  )
-              } // entries
-
-              entries
-            } // yield
-          } // else
+    /* The following makes sure that agreements exist between the start
+     * and end days of the processed event. As agreement updates are
+     * guaranteed not to leave gaps, this means that the event can be
+     * processed correctly, as at least one agreement will be valid
+     * throughout the event's life.
+     */
+    assert(
+      agreements.keysIterator.exists {
+        p => p.includes(occuredDate)
+      } && agreements.keysIterator.exists {
+        p => p.includes(previousOccurred)
       }
-    }.flatten1
+    )
+
+    val t = Timeslot(previousOccurred, occuredDate)
+
+    // Align policy and agreement validity timeslots to the event's boundaries
+    val policyTimeslots = t.align(
+      Policy.policies(previousOccurred, occuredDate).keysIterator.toList)
+    val agreementTimeslots = t.align(agreements.keysIterator.toList)
+
+    /*
+     * Get a set of timeslot slices covering the different durations of
+     * agreements and policies.
+     */
+    val aligned = alignTimeslots(policyTimeslots, agreementTimeslots)
+
+    val walletEntries = aligned.map {
+      x =>
+        // Retrieve agreement from the policy valid at time of event
+        val agreementName = agreements.find(y => y._1.contains(x)) match {
+          case Some(x) => x
+          case None => return Failed(new AccountingException(("Cannot find" +
+            " user agreement for period %s").format(x)))
+        }
+
+        // Do the wallet entry calculation
+        val entries = chargeEvent(
+          currentResourceEvent,
+          Policy.policy(x.from).findAgreement(agreementName._2).getOrElse(
+            return Failed(new AccountingException("Cannot get agreement for %s".format()))
+          ),
+          previousAmount,
+          previousOccurred,
+          related,
+          Some(x)
+        ) match {
+          case Just(x) => x
+          case Failed(f, e) => return Failed(f,e)
+          case NoVal => List()
+        }
+        entries
+    }.flatten
+
+    Just(walletEntries)
   }
+
   /**
    * Creates a list of wallet entries by applying the agreement provisions on
    * the resource state.
    *
-   * @param resourceEvent The resource event to create charges for
-   * @param agreement The agreement applicable to the user mentioned in the event
-   * @param currentValue The current state of the resource
-   * @param currentSnapshotDate The last time the resource state was updated
+   * @param event The resource event to create charges for
+   * @param agr The agreement implementation to use
+   * @param previousAmount The current state of the resource
+   * @param previousOccurred The timestamp of the previous event
+   * @param related Related wallet entries (TODO: should remove)
+   * @param chargeFor The duration for which the charge should be done.
+   *                  Should fall between the previous and current
+   *                  resource event boundaries
+   * @return A list of wallet entries, one for each
    */
-  def chargeEvent(resourceEvent: ResourceEvent,
-                  agreement: DSLAgreement,
-                  currentValue: Double,
-                  currentSnapshotDate: Date,
-                  related: List[WalletEntry]): Maybe[List[WalletEntry]] = {
-
-    assert(currentSnapshotDate.getTime <= resourceEvent.occurredMillis)
-
-    if (!resourceEvent.validate())
+  def chargeEvent(event: ResourceEvent,
+                  agr: DSLAgreement,
+                  previousAmount: Double,
+                  previousOccurred: Date,
+                  related: List[WalletEntry],
+                  chargeFor: Option[Timeslot]): Maybe[List[WalletEntry]] = {
+
+    // If chargeFor is not null, make sure it falls within
+    // event time boundaries
+    chargeFor.map{x => assert(true,
+      Timeslot(previousOccurred, new Date(event.occurredMillis)))}
+
+    if (!event.validate())
       return Failed(new AccountingException("Event not valid"))
 
     val policy = Policy.policy
-    val dslResource = policy.findResource(resourceEvent.resource) match {
+    val dslResource = policy.findResource(event.resource) match {
       case Some(x) => x
-      case None => return Failed(new AccountingException("No resource [%s]".format(resourceEvent.resource)))
+      case None => return Failed(
+        new AccountingException("No resource [%s]".format(event.resource)))
     }
 
     /* This is a safeguard against the special case where the last
@@ -220,14 +469,14 @@ trait Accounting extends DSLUtils with Loggable {
      * this is the first time the resource state has been recorded.
      * Charging in this case only makes sense for discrete resources.
      */
-    if (currentSnapshotDate.getTime == resourceEvent.occurredMillis) {
+    if (previousOccurred.getTime == event.occurredMillis) {
       dslResource.costPolicy match {
         case DiscreteCostPolicy => //Ok
         case _ => return Some(List())
       }
     }
 
-    val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(Just(currentValue), resourceEvent.value)
+    val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(Just(previousAmount), event.value)
     val amount = creditCalculationValueM match {
       case failed @ Failed(_, _) ⇒
         return failed
@@ -241,43 +490,61 @@ trait Accounting extends DSLUtils with Loggable {
     // above, since this point won't be reached in case of error.
     val isFinal = dslResource.costPolicy match {
       case OnOffCostPolicy =>
-        OnOffPolicyResourceState(currentValue) match {
+        OnOffPolicyResourceState(previousAmount) match {
           case OnResourceState => false
           case OffResourceState => true
         }
       case _ => true
     }
 
+    /*
+     * Get the timeslot for which this event will be charged. In case we
+     * have a discrete resource, we do not really care for the time duration
+     * of an event. To process all events in a uniform way, we create an
+     * artificial timeslot lasting the minimum amount of time. In all other
+     * cases, we first check whether a desired charge period passed as
+     * an argument.
+     */
     val timeslot = dslResource.costPolicy match {
-      case DiscreteCostPolicy => Timeslot(new Date(resourceEvent.occurredMillis),
-        new Date(resourceEvent.occurredMillis + 1))
-      case _ => Timeslot(currentSnapshotDate, new Date(resourceEvent.occurredMillis))
+      case DiscreteCostPolicy => Timeslot(new Date(event.occurredMillis - 1),
+        new Date(event.occurredMillis))
+      case _ => chargeFor match {
+        case Some(x) => x
+        case None => Timeslot(previousOccurred, new Date(event.occurredMillis))
+      }
     }
 
-    val chargeChunks = calcChangeChunks(agreement, amount, dslResource, timeslot)
+    /*
+     * The following splits the chargable timeslot into smaller timeslots to
+     * comply with different applicability periods for algorithms and
+     * pricelists defined by the provided agreement.
+     */
+    val chargeChunks = calcChangeChunks(agr, amount, dslResource, timeslot)
 
     val timeReceived = System.currentTimeMillis
 
-    val rel = related.map{x => x.sourceEventIDs}.flatten ++ List(resourceEvent.id)
+    val rel = event.id :: related.map{x => x.sourceEventIDs}.flatten
 
-    val entries = chargeChunks.map {
-      c =>
+    val entries = chargeChunks.map { c=>
         WalletEntry(
           id = CryptoUtils.sha1(c.id),
-          occurredMillis = resourceEvent.occurredMillis,
+          occurredMillis = event.occurredMillis,
           receivedMillis = timeReceived,
           sourceEventIDs = rel,
           value = c.cost,
           reason = c.reason,
-          userId = resourceEvent.userId,
-          resource = resourceEvent.resource,
-          instanceId = resourceEvent.instanceId,
+          userId = event.userId,
+          resource = event.resource,
+          instanceId = event.instanceId,
           finalized = isFinal
         )
     }
     Just(entries)
   }
 
+  /**
+   * Create a
+   */
   def calcChangeChunks(agr: DSLAgreement, volume: Double,
                        res: DSLResource, t: Timeslot): List[ChargeChunk] = {
 
@@ -295,10 +562,14 @@ trait Accounting extends DSLUtils with Loggable {
     }
   }
 
+  /**
+   * Get a list of charge chunks for discrete resources.
+   */
   private[logic]
   def calcChargeChunksDiscrete(algChunked: Map[Timeslot, DSLAlgorithm],
                                priChunked: Map[Timeslot, DSLPriceList],
                                volume: Double, res: DSLResource): List[ChargeChunk] = {
+    // In case of descrete resources, we only a expect a
     assert(algChunked.size == 1)
     assert(priChunked.size == 1)
     assert(algChunked.keySet.head.compare(priChunked.keySet.head) == 0)
@@ -309,6 +580,9 @@ trait Accounting extends DSLUtils with Loggable {
       algChunked.keySet.head, res))
   }
 
+  /**
+   * Get a list of charge chunks for continuous resources.
+   */
   private[logic]
   def calcChargeChunksContinuous(algChunked: Map[Timeslot, DSLAlgorithm],
                                  priChunked: Map[Timeslot, DSLPriceList],
@@ -327,7 +601,7 @@ trait Accounting extends DSLUtils with Loggable {
    * examines them and splits them as necessary.
    */
   private[logic] def splitChargeChunks(alg: SortedMap[Timeslot, DSLAlgorithm],
-                        price: SortedMap[Timeslot, DSLPriceList]) :
+                                       price: SortedMap[Timeslot, DSLPriceList]) :
     (Map[Timeslot, DSLAlgorithm], Map[Timeslot, DSLPriceList]) = {
 
     val zipped = alg.keySet.zip(price.keySet)
@@ -354,8 +628,57 @@ trait Accounting extends DSLUtils with Loggable {
         }
     }
   }
+
+  /**
+   * Given two lists of timeslots, produce a list which contains the
+   * set of timeslot slices, as those are defined by
+   * timeslot overlaps.
+   *
+   * For example, given the timeslots a and b below, split them as shown.
+   *
+   * a = |****************|
+   *     ^                ^
+   *   a.from            a.to
+   * b = |*********|
+   *     ^         ^
+   *   b.from     b.to
+   *
+   * result: List(Timeslot(a.from, b.to), Timeslot(b.to, a.to))
+   */
+  private[logic] def alignTimeslots(a: List[Timeslot],
+                                    b: List[Timeslot],
+                                    clogM: Maybe[ContextualLogger] = NoVal): List[Timeslot] = {
+    val clog = ContextualLogger.fromOther(clogM, logger, "alignTimeslots()")
+    clog.begin()
+
+    ContextualLogger.debugList(clog, "a", a)
+    ContextualLogger.debugList(clog, "b", b)
+
+    if (a.isEmpty) return b.tail
+    if (b.isEmpty) return a.tail
+    assert (a.head.from == b.head.from)
+
+    val clogJ = Just(clog)
+    val result = if (a.head.endsAfter(b.head)) {
+      clog.debug("Branch: a.head.endsAfter(b.head)")
+      a.head.slice(b.head.to) ::: alignTimeslots(a.tail, b.tail, clogJ)
+    } else if (b.head.endsAfter(a.head)) {
+      clog.debug("Branch: b.head.endsAfter(a.head)")
+      b.head.slice(a.head.to) ::: alignTimeslots(a.tail, b.tail, clogJ)
+    } else {
+      clog.debug("Branch: !a.head.endsAfter(b.head) && !b.head.endsAfter(a.head)")
+      a.head :: alignTimeslots(a.tail, b.tail, clogJ)
+    }
+
+    clog.end()
+    result
+  }
 }
 
+/**
+ * Encapsulates a computation for a specific timeslot of
+ * resource usage.
+ */
 case class ChargeChunk(value: Double, algorithm: String,
                        price: Double, when: Timeslot,
                        resource: DSLResource) {