WIP: ResourceEvent-related refactorings
[aquarium] / src / main / scala / gr / grnet / aquarium / logic / accounting / Accounting.scala
index 97b7b02..b376bff 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2011 GRNET S.A. All rights reserved.
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or
  * without modification, are permitted provided that the following
 
 package gr.grnet.aquarium.logic.accounting
 
+import gr.grnet.aquarium.util.shortClassNameOf
 import algorithm.CostPolicyAlgorithmCompiler
 import dsl._
-import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent}
 import collection.immutable.SortedMap
 import java.util.Date
-import gr.grnet.aquarium.util.{CryptoUtils, Loggable}
 import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just}
-import gr.grnet.aquarium.user.Agreement
+import gr.grnet.aquarium.util.{ContextualLogger, CryptoUtils, Loggable}
 import gr.grnet.aquarium.store.PolicyStore
-case class Chargeslot(startChargeMillis: Long, stopChargeMillis: Long, charge: Double)
+import gr.grnet.aquarium.event.{WalletEntry}
+import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
+import gr.grnet.aquarium.event.resource.ResourceEventModel
+import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
+
+/**
+ * A timeslot together with the algorithm and unit price that apply for this particular timeslot.
+ *
+ * @param startMillis
+ * @param stopMillis
+ * @param algorithmDefinition
+ * @param unitPrice
+ * @param computedCredits The computed credits
+ */
+case class Chargeslot(startMillis: Long,
+                      stopMillis: Long,
+                      algorithmDefinition: String,
+                      unitPrice: Double,
+                      computedCredits: Option[Double] = None) {
+
+  override def toString = "%s(%s, %s, %s, %s, %s)".format(
+    shortClassNameOf(this),
+    new MutableDateCalc(startMillis).toYYYYMMDDHHMMSSSSS,
+    new MutableDateCalc(stopMillis).toYYYYMMDDHHMMSSSSS,
+    unitPrice,
+    computedCredits,
+    algorithmDefinition
+  )
+}
 
 /**
  * Methods for converting accounting events to wallet entries.
@@ -53,209 +80,288 @@ case class Chargeslot(startChargeMillis: Long, stopChargeMillis: Long, charge: D
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 trait Accounting extends DSLUtils with Loggable {
+  /**
+   * Breaks a reference timeslot (e.g. billing period) according to policies and agreements.
+   *
+   * @param referenceTimeslot
+   * @param policyTimeslots
+   * @param agreementTimeslots
+   * @return
+   */
+  protected
+  def splitTimeslotByPoliciesAndAgreements(referenceTimeslot: Timeslot,
+                                           policyTimeslots: List[Timeslot],
+                                           agreementTimeslots: List[Timeslot],
+                                           clogM: Maybe[ContextualLogger] = NoVal): List[Timeslot] = {
+
+//    val clog = ContextualLogger.fromOther(clogM, logger, "splitTimeslotByPoliciesAndAgreements()")
+//    clog.begin()
+
+    // Align policy and agreement validity timeslots to the referenceTimeslot
+    val alignedPolicyTimeslots    = referenceTimeslot.align(policyTimeslots)
+    val alignedAgreementTimeslots = referenceTimeslot.align(agreementTimeslots)
+
+//    clog.debug("referenceTimeslot = %s", referenceTimeslot)
+//    clog.debugSeq("alignedPolicyTimeslots", alignedPolicyTimeslots, 0)
+//    clog.debugSeq("alignedAgreementTimeslots", alignedAgreementTimeslots, 0)
+
+    val result = alignTimeslots(alignedPolicyTimeslots, alignedAgreementTimeslots)
+//    clog.debugSeq("result", result, 1)
+//    clog.end()
+    result
+  }
+
+  /**
+   * Given a reference timeslot, we have to break it up to a series of timeslots where a particular
+   * algorithm and price unit is in effect.
+   *
+   */
+  protected
+  def resolveEffectiveAlgorithmsAndPriceLists(alignedTimeslot: Timeslot,
+                                              agreement: DSLAgreement,
+                                              clogOpt: Option[ContextualLogger] = None):
+          (Map[Timeslot, DSLAlgorithm], Map[Timeslot, DSLPriceList]) = {
+
+    val clog = ContextualLogger.fromOther(clogOpt, logger, "resolveEffectiveAlgorithmsAndPriceLists()")
+
+    // Note that most of the code is taken from calcChangeChunks()
+    val alg = resolveEffectiveAlgorithmsForTimeslot(alignedTimeslot, agreement)
+    val pri = resolveEffectivePricelistsForTimeslot(alignedTimeslot, agreement)
+    val chargeChunks = splitChargeChunks(alg, pri)
+    val algorithmByTimeslot = chargeChunks._1
+    val pricelistByTimeslot = chargeChunks._2
+
+    assert(algorithmByTimeslot.size == pricelistByTimeslot.size)
+
+    (algorithmByTimeslot, pricelistByTimeslot)
+  }
 
-  def splitTimeslotsByPoliciesAndAgreements(bigTimeslot: Timeslot,
-                                            policies: SortedMap[Timeslot, DSLPolicy],
-                                            agreements: List[Agreement]): Map[Timeslot, DSLAgreement] = {
-    // TODO: Implement
-    //       Find the agreements and their respective timeslots, taking into account the policies
-    Map()
+  protected
+  def computeInitialChargeslots(referenceTimeslot: Timeslot,
+                                dslResource: DSLResource,
+                                policiesByTimeslot: Map[Timeslot, DSLPolicy],
+                                agreementNamesByTimeslot: Map[Timeslot, String],
+                                clogOpt: Option[ContextualLogger] = None): List[Chargeslot] = {
+
+    val clog = ContextualLogger.fromOther(clogOpt, logger, "computeInitialChargeslots()")
+//    clog.begin()
+
+    val policyTimeslots = policiesByTimeslot.keySet
+    val agreementTimeslots = agreementNamesByTimeslot.keySet
+
+//    clog.debugMap("policiesByTimeslot", policiesByTimeslot, 1)
+//    clog.debugMap("agreementNamesByTimeslot", agreementNamesByTimeslot, 1)
+
+    def getPolicy(ts: Timeslot): DSLPolicy = {
+      policiesByTimeslot.find(_._1.contains(ts)).get._2
+    }
+    def getAgreementName(ts: Timeslot): String = {
+      agreementNamesByTimeslot.find(_._1.contains(ts)).get._2
+    }
+
+    // 1. Round ONE: split time according to overlapping policies and agreements.
+//    clog.begin("ROUND 1")
+    val alignedTimeslots = splitTimeslotByPoliciesAndAgreements(referenceTimeslot, policyTimeslots.toList, agreementTimeslots.toList, Just(clog))
+//    clog.debugSeq("alignedTimeslots", alignedTimeslots, 1)
+//    clog.end("ROUND 1")
+
+    // 2. Round TWO: Use the aligned timeslots of Round ONE to produce even more
+    //    fine-grained timeslots according to applicable algorithms.
+    //    Then pack the info into charge slots.
+//    clog.begin("ROUND 2")
+    val allChargeslots = for {
+      alignedTimeslot <- alignedTimeslots
+    } yield {
+//      val alignedTimeslotMsg = "alignedTimeslot = %s".format(alignedTimeslot)
+//      clog.begin(alignedTimeslotMsg)
+
+      val dslPolicy = getPolicy(alignedTimeslot)
+//      clog.debug("dslPolicy = %s", dslPolicy)
+      val agreementName = getAgreementName(alignedTimeslot)
+//      clog.debug("agreementName = %s", agreementName)
+      val agreementOpt = dslPolicy.findAgreement(agreementName)
+//      clog.debug("agreementOpt = %s", agreementOpt)
+
+      agreementOpt match {
+        case None ⇒
+          val errMsg = "Unknown agreement %s during %s".format(agreementName, alignedTimeslot)
+          clog.error("%s", errMsg)
+          throw new AquariumException(errMsg)
+
+        case Some(agreement) ⇒
+          // TODO: Factor this out, just like we did with:
+          // TODO:  val alignedTimeslots = splitTimeslotByPoliciesAndAgreements
+          // Note that most of the code is already taken from calcChangeChunks()
+          val r = resolveEffectiveAlgorithmsAndPriceLists(alignedTimeslot, agreement, Some(clog))
+          val algorithmByTimeslot: Map[Timeslot, DSLAlgorithm] = r._1
+          val pricelistByTimeslot: Map[Timeslot, DSLPriceList] = r._2
+
+          // Now, the timeslots must be the same
+          val finegrainedTimeslots = algorithmByTimeslot.keySet
+
+          val chargeslots = for {
+            finegrainedTimeslot <- finegrainedTimeslots
+          } yield {
+//            val finegrainedTimeslotMsg = "finegrainedTimeslot = %s".format(finegrainedTimeslot)
+//            clog.begin(finegrainedTimeslotMsg)
+
+            val dslAlgorithm = algorithmByTimeslot(finegrainedTimeslot) // TODO: is this correct?
+//            clog.debug("dslAlgorithm = %s", dslAlgorithm)
+//            clog.debugMap("dslAlgorithm.algorithms", dslAlgorithm.algorithms, 1)
+            val dslPricelist = pricelistByTimeslot(finegrainedTimeslot) // TODO: is this correct?
+//            clog.debug("dslPricelist = %s", dslPricelist)
+//            clog.debug("dslResource = %s", dslResource)
+            val algorithmDefOpt = dslAlgorithm.algorithms.get(dslResource)
+//            clog.debug("algorithmDefOpt = %s", algorithmDefOpt)
+            val priceUnitOpt = dslPricelist.prices.get(dslResource)
+//            clog.debug("priceUnitOpt = %s", priceUnitOpt)
+
+            val chargeslot = (algorithmDefOpt, priceUnitOpt) match {
+              case (None, None) ⇒
+                throw new AquariumException(
+                  "Unknown algorithm and price unit for resource %s during %s".
+                    format(dslResource, finegrainedTimeslot))
+              case (None, _) ⇒
+                throw new AquariumException(
+                  "Unknown algorithm for resource %s during %s".
+                    format(dslResource, finegrainedTimeslot))
+              case (_, None) ⇒
+                throw new AquariumException(
+                  "Unknown price unit for resource %s during %s".
+                    format(dslResource, finegrainedTimeslot))
+              case (Some(algorithmDefinition), Some(priceUnit)) ⇒
+                Chargeslot(finegrainedTimeslot.from.getTime, finegrainedTimeslot.to.getTime, algorithmDefinition, priceUnit)
+            }
+
+//            clog.end(finegrainedTimeslotMsg)
+            chargeslot
+          }
+
+//          clog.end(alignedTimeslotMsg)
+          chargeslots.toList
+      }
+    }
+//    clog.end("ROUND 2")
+
+
+    val result = allChargeslots.flatten
+//    clog.debugSeq("result", allChargeslots, 1)
+//    clog.end()
+    result
   }
-  
-//  def computeChargeSlots(timeslot: Timeslot,
-//                         dslResource: DSLResource,
-//                         dslAgreement: DSLAgreement,
-//                         algorithmCompiler: CostPolicyAlgorithmCompiler,
-//                         costPolicyVars: Map[DSLCostPolicyVar, Any]): Maybe[Traversable[Chargeslot]] = Maybe {
-//    // TODO split further based on timeslot
-//    val ta = List((timeslot, dslAgreement))
-//    for {
-//      (timeslot, dslAgreement) <- ta
-//    } yield {
-//      val startMillis = timeslot.from.getTime
-//      val stopMillis  = timeslot.to.getTime
-//      val algorithmDefM = dslAgreement.algorithm.findDefinitionForResource(dslResource)
-//      algorithmDefM match {
-//        case Just(algorithmDef) ⇒
-//          val compiledM = algorithmCompiler.compile(algorithmDef)
-//
-//          compiledM match {
-//            case Just(compiled) ⇒
-//              val chargeM = compiled.apply(costPolicyVars)
-//              chargeM match {
-//                case Just(charge) ⇒
-//                  Chargeslot(startMillis, stopMillis, charge)
-//                case NoVal ⇒
-//                  throw new Exception("No charge could be computed")
-//                case failed @ Failed(e, _) ⇒
-//                  throw e
-//              }
-//
-//            case NoVal ⇒
-//              throw new Exception("Could not compile algorithm %s".format(algorithmDef))
-//
-//            case failed @ Failed(e, _) ⇒
-//              throw e
-//          }
-//          Nil
-//        case NoVal ⇒
-//          throw new Exception("Could not find charging algorith for resource %s".format(dslResource))
-//
-//        case failed @ Failed(e, _) ⇒
-//          throw e // TODO: handle better
-//      }
-//    }
-//  }
-  
+
   /**
-   * Compute the charge chunks generated by a particular resource event.
+   * Compute the charge slots generated by a particular resource event.
    *
    */
-  def computeChargeChunks(previousResourceEventM: Maybe[ResourceEvent],
-                          currentResourceEvent: ResourceEvent,
-                          oldCredits: Double,
-                          oldAmount: Double,
-                          newAmount: Double,
-                          dslResource: DSLResource,
-                          defaultResourceMap: DSLResourcesMap,
-                          alltimeAgreements: List[Agreement]): Maybe[Traversable[ChargeChunk]] = Maybe {
+  def computeFullChargeslots(previousResourceEventOpt: Option[ResourceEventModel],
+                             currentResourceEvent: ResourceEventModel,
+                             oldCredits: Double,
+                             oldTotalAmount: Double,
+                             newTotalAmount: Double,
+                             dslResource: DSLResource,
+                             defaultResourceMap: DSLResourcesMap,
+                             agreementNamesByTimeslot: Map[Timeslot, String],
+                             algorithmCompiler: CostPolicyAlgorithmCompiler,
+                             policyStore: PolicyStore,
+                             clogOpt: Option[ContextualLogger] = None): (Timeslot, List[Chargeslot]) = {
+
+    val clog = ContextualLogger.fromOther(clogOpt, logger, "computeFullChargeslots()")
+//    clog.begin()
+
     val occurredDate = currentResourceEvent.occurredDate
+    val occurredMillis = currentResourceEvent.occurredMillis
     val costPolicy = dslResource.costPolicy
-    
-    costPolicy.needsPreviousEventForCreditAndAmountCalculation match {
+
+    val dsl = new DSL{}
+    val (referenceTimeslot, relevantPolicies, previousValue) = costPolicy.needsPreviousEventForCreditAndAmountCalculation match {
       // We need a previous event
       case true ⇒
-        previousResourceEventM match {
+        previousResourceEventOpt match {
           // We have a previous event
-          case Just(previousResourceEvent) ⇒
-            val occurredDelta = currentResourceEvent occurredDeltaFrom previousResourceEvent
-            val bigTimeslot = Timeslot(previousResourceEvent.occurredDate, occurredDate)
+          case Some(previousResourceEvent) ⇒
+//            clog.debug("Have previous event")
+//            clog.debug("previousValue = %s", previousResourceEvent.value)
+
+            val referenceTimeslot = Timeslot(previousResourceEvent.occurredDate, occurredDate)
+//            clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
+
             // all policies within the interval from previous to current resource event
-            val relevantPolicies = Policy.policies(bigTimeslot)
-            val dslAgreementByTimeslotFromPolicies = splitTimeslotsByPoliciesAndAgreements(bigTimeslot, relevantPolicies, alltimeAgreements)
-
-//            val allChargeslots = for {
-//              // Level 1: split into timeslots by taking into account the timeslots of policy and agreement versions
-//              (timeslot1, dslAgreement1) <- dslAgreementByTimeslotFromPolicies
-//              // Level 2: split into more fine-grained timeslots by taking into account
-//              chargeslot                 <- computeChargeSlots(timeslot1, dslResource, dslAgreement1)
-//            } yield {
-//              chargeslot
-//            }
-//
-//            allChargeslots
+//            clog.debug("Calling policyStore.loadAndSortPoliciesWithin(%s)", referenceTimeslot)
+            val relevantPolicies = policyStore.loadAndSortPoliciesWithin(referenceTimeslot.from.getTime, referenceTimeslot.to.getTime, dsl)
+//            clog.debugMap("==> relevantPolicies", relevantPolicies, 0)
+
+            (referenceTimeslot, relevantPolicies, previousResourceEvent.value)
+
           // We do not have a previous event
-          case NoVal ⇒
-            throw new Exception(
+          case None ⇒
+            throw new AquariumException(
               "Unable to charge. No previous event given for %s".
-                format(currentResourceEvent.toDebugString(defaultResourceMap)))
-
-          // We could not obtain a previous event
-          case failed @ Failed(e, m) ⇒
-            throw new Exception(
-              "Unable to charge. Could not obtain previous event for %s".
-                format(currentResourceEvent.toDebugString(defaultResourceMap)))
+                format(currentResourceEvent.toDebugString()))
         }
-        
+
       // We do not need a previous event
       case false ⇒
         // ... so we cannot compute timedelta from a previous event, there is just one chargeslot
-        // referring to an instant in time
-        val agreementAtOccurred = alltimeAgreements.head
+        // referring to (almost) an instant in time
+//        clog.debug("DO NOT have previous event")
+        val previousValue = costPolicy.getResourceInstanceUndefinedAmount
+//        clog.debug("previousValue = costPolicy.getResourceInstanceUndefinedAmount = %s", previousValue)
+
+        val referenceTimeslot = Timeslot(new MutableDateCalc(occurredDate).goPreviousMilli.toDate, occurredDate)
+//        clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
+
+//        clog.debug("Calling policyStore.loadValidPolicyEntryAt(%s)", new MutableDateCalc(occurredMillis))
+        val relevantPolicyOpt = policyStore.loadValidPolicyAt(occurredMillis, dsl)
+//        clog.debug("  ==> relevantPolicyM = %s", relevantPolicyM)
+
+        val relevantPolicies = relevantPolicyOpt match {
+          case Some(relevantPolicy) ⇒
+            Map(referenceTimeslot -> relevantPolicy)
+
+          case None ⇒
+            throw new AquariumInternalError("No relevant policy found for %s".format(referenceTimeslot))
+        }
+
+        (referenceTimeslot, relevantPolicies, previousValue)
     }
-//    val valueMap = costPolicy.makeValueMap(
-//      oldCredits,
-//      oldAmount,
-//      newAmount,
-//      previousResourceEventM.map(pe => currentResourceEvent.occurredMillis - pe.occurredMillis).getOr(0.0),
-//      previousResourceEventM.map(pe => pe.value).getOr(0.0),
-//      currentResourceEvent.value,
-//      1.0
-//    )
-
-
-    Nil
-  }
 
-  def chargeEvent2(oldResourceEventM: Maybe[ResourceEvent],
-                   newResourceEvent: ResourceEvent,
-                   dslAgreement: DSLAgreement,
-                   lastSnapshotDate: Date,
-                   related: Traversable[WalletEntry]): Maybe[Traversable[WalletEntry]] = {
-    Maybe {
-      val dslPolicy: DSLPolicy = Policy.policy // TODO: query based on time
-      val resourceEvent = newResourceEvent
-      dslPolicy.findResource(resourceEvent.resource) match {
-        case None ⇒
-          throw new AccountingException("No resource [%s]".format(resourceEvent.resource))
-        case Some(dslResource) ⇒
-
-          val costPolicy = dslResource.costPolicy
-          val isDiscrete = costPolicy.isDiscrete
-          val oldValueM = oldResourceEventM.map(_.value)
-          val newValue = newResourceEvent.value
-
-          /* This is a safeguard against the special case where the last
-          * resource state update, as marked by the lastUpdate parameter
-          * is equal to the time of the event occurrence. This means that
-          * this is the first time the resource state has been recorded.
-          * Charging in this case only makes sense for discrete resources.
-          */
-          if (lastSnapshotDate.getTime == resourceEvent.occurredMillis && !isDiscrete) {
-            Just(List())
-          } else {
-            val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(oldValueM, newValue).forNoVal(Just(0.0))
-            for {
-              amount <- creditCalculationValueM
-            } yield {
-              // We don't do strict checking for all cases for OnOffPolicies as
-              // above, since this point won't be reached in case of error.
-              val isFinal = dslResource.costPolicy match {
-                case OnOffCostPolicy =>
-                  OnOffPolicyResourceState(oldValueM) match {
-                    case OnResourceState => false
-                    case OffResourceState => true
-                  }
-                case _ => true
-              }
-
-              val timeslot = dslResource.costPolicy match {
-                case DiscreteCostPolicy => Timeslot(new Date(resourceEvent.occurredMillis),
-                  new Date(resourceEvent.occurredMillis + 1))
-                case _ => Timeslot(lastSnapshotDate, new Date(resourceEvent.occurredMillis))
-              }
-
-              val chargeChunks = calcChangeChunks(dslAgreement, amount, dslResource, timeslot)
-
-              val timeReceived = System.currentTimeMillis
-
-              val rel = related.map{x => x.sourceEventIDs}.flatten ++ Traversable(resourceEvent.id)
-
-              val entries = chargeChunks.map {
-                chargedChunk ⇒
-                  WalletEntry(
-                    id = CryptoUtils.sha1(chargedChunk.id),
-                    occurredMillis = resourceEvent.occurredMillis,
-                    receivedMillis = timeReceived,
-                    sourceEventIDs = rel.toList,
-                    value = chargedChunk.cost,
-                    reason = chargedChunk.reason,
-                    userId = resourceEvent.userId,
-                    resource = resourceEvent.resource,
-                    instanceId = resourceEvent.instanceId,
-                    finalized = isFinal
-                  )
-              } // entries
-
-              entries
-            } // yield
-          } // else
-      }
-    }.flatten1
+    val initialChargeslots = computeInitialChargeslots(
+      referenceTimeslot,
+      dslResource,
+      relevantPolicies,
+      agreementNamesByTimeslot,
+      Some(clog)
+    )
+
+    val fullChargeslots = initialChargeslots.map {
+      case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒
+        val execAlgorithm = algorithmCompiler.compile(algorithmDefinition)
+        val valueMap = costPolicy.makeValueMap(
+          oldCredits,
+          oldTotalAmount,
+          newTotalAmount,
+          stopMillis - startMillis,
+          previousValue,
+          currentResourceEvent.value,
+          unitPrice
+        )
+
+//              clog.debug("execAlgorithm = %s", execAlgorithm)
+        clog.debugMap("valueMap", valueMap, 1)
+
+        // This is it
+        val credits = execAlgorithm.apply(valueMap)
+        chargeslot.copy(computedCredits = Some(credits))
+    }
+
+    val result = referenceTimeslot -> fullChargeslots
+
+    result
   }
 
   /**
-   * Creates a list of wallet entries by examining the on the resource state.
+   * Create a list of wallet entries by charging for a resource event.
    *
    * @param currentResourceEvent The resource event to create charges for
    * @param agreements The user's agreement names, indexed by their
@@ -263,7 +369,7 @@ trait Accounting extends DSLUtils with Loggable {
    * @param previousAmount The current state of the resource
    * @param previousOccurred The last time the resource state was updated
    */
-  def chargeEvent(currentResourceEvent: ResourceEvent,
+  def chargeEvent(currentResourceEvent: ResourceEventModel,
                   agreements: SortedMap[Timeslot, String],
                   previousAmount: Double,
                   previousOccurred: Date,
@@ -279,9 +385,9 @@ trait Accounting extends DSLUtils with Loggable {
      * throughout the event's life.
      */
     assert(
-      agreements.keys.exists {
+      agreements.keysIterator.exists {
         p => p.includes(occuredDate)
-      } && agreements.keys.exists {
+      } && agreements.keysIterator.exists {
         p => p.includes(previousOccurred)
       }
     )
@@ -290,8 +396,8 @@ trait Accounting extends DSLUtils with Loggable {
 
     // Align policy and agreement validity timeslots to the event's boundaries
     val policyTimeslots = t.align(
-      Policy.policies(previousOccurred, occuredDate).keys.toList)
-    val agreementTimeslots = t.align(agreements.keys.toList)
+      Policy.policies(previousOccurred, occuredDate).keysIterator.toList)
+    val agreementTimeslots = t.align(agreements.keysIterator.toList)
 
     /*
      * Get a set of timeslot slices covering the different durations of
@@ -301,7 +407,7 @@ trait Accounting extends DSLUtils with Loggable {
 
     val walletEntries = aligned.map {
       x =>
-        // Retrieve agreement from policy valid at time of event
+        // Retrieve agreement from the policy valid at time of event
         val agreementName = agreements.find(y => y._1.contains(x)) match {
           case Some(x) => x
           case None => return Failed(new AccountingException(("Cannot find" +
@@ -312,14 +418,15 @@ trait Accounting extends DSLUtils with Loggable {
         val entries = chargeEvent(
           currentResourceEvent,
           Policy.policy(x.from).findAgreement(agreementName._2).getOrElse(
-            return Failed(new AccountingException("Cannot get agreement for "))
+            return Failed(new AccountingException("Cannot get agreement for %s".format()))
           ),
           previousAmount,
           previousOccurred,
-          related
+          related,
+          Some(x)
         ) match {
           case Just(x) => x
-          case Failed(f, e) => return Failed(f,e)
+          case Failed(f) => return Failed(f)
           case NoVal => List()
         }
         entries
@@ -332,26 +439,36 @@ trait Accounting extends DSLUtils with Loggable {
    * Creates a list of wallet entries by applying the agreement provisions on
    * the resource state.
    *
-   * @param currentResourceEvent The resource event to create charges for
+   * @param event The resource event to create charges for
    * @param agr The agreement implementation to use
    * @param previousAmount The current state of the resource
-   * @param previousOccurred
-   * @param related
-   * @return
+   * @param previousOccurred The timestamp of the previous event
+   * @param related Related wallet entries (TODO: should remove)
+   * @param chargeFor The duration for which the charge should be done.
+   *                  Should fall between the previous and current
+   *                  resource event boundaries
+   * @return A list of wallet entries, one for each
    */
-  def chargeEvent(currentResourceEvent: ResourceEvent,
+  def chargeEvent(event: ResourceEventModel,
                   agr: DSLAgreement,
                   previousAmount: Double,
                   previousOccurred: Date,
-                  related: List[WalletEntry]): Maybe[List[WalletEntry]] = {
+                  related: List[WalletEntry],
+                  chargeFor: Option[Timeslot]): Maybe[List[WalletEntry]] = {
 
-    if (!currentResourceEvent.validate())
-      return Failed(new AccountingException("Event not valid"))
+    // If chargeFor is not null, make sure it falls within
+    // event time boundaries
+    chargeFor.map{x => assert(true,
+      Timeslot(previousOccurred, new Date(event.occurredMillis)))}
+
+//    if (!event.validate())
+//      return Failed(new AccountingException("Event not valid"))
 
     val policy = Policy.policy
-    val dslResource = policy.findResource(currentResourceEvent.resource) match {
+    val dslResource = policy.findResource(event.resource) match {
       case Some(x) => x
-      case None => return Failed(new AccountingException("No resource [%s]".format(currentResourceEvent.resource)))
+      case None => return Failed(
+        new AccountingException("No resource [%s]".format(event.resource)))
     }
 
     /* This is a safeguard against the special case where the last
@@ -360,16 +477,16 @@ trait Accounting extends DSLUtils with Loggable {
      * this is the first time the resource state has been recorded.
      * Charging in this case only makes sense for discrete resources.
      */
-    if (previousOccurred.getTime == currentResourceEvent.occurredMillis) {
+    if (previousOccurred.getTime == event.occurredMillis) {
       dslResource.costPolicy match {
         case DiscreteCostPolicy => //Ok
         case _ => return Some(List())
       }
     }
 
-    val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(Just(previousAmount), currentResourceEvent.value)
+    val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(Just(previousAmount), event.value)
     val amount = creditCalculationValueM match {
-      case failed @ Failed(_, _) ⇒
+      case failed @ Failed(_) ⇒
         return failed
       case Just(amount) ⇒
         amount
@@ -388,35 +505,54 @@ trait Accounting extends DSLUtils with Loggable {
       case _ => true
     }
 
+    /*
+     * Get the timeslot for which this event will be charged. In case we
+     * have a discrete resource, we do not really care for the time duration
+     * of an event. To process all events in a uniform way, we create an
+     * artificial timeslot lasting the minimum amount of time. In all other
+     * cases, we first check whether a desired charge period passed as
+     * an argument.
+     */
     val timeslot = dslResource.costPolicy match {
-      case DiscreteCostPolicy => Timeslot(new Date(currentResourceEvent.occurredMillis),
-        new Date(currentResourceEvent.occurredMillis + 1))
-      case _ => Timeslot(previousOccurred, new Date(currentResourceEvent.occurredMillis))
+      case DiscreteCostPolicy => Timeslot(new Date(event.occurredMillis - 1),
+        new Date(event.occurredMillis))
+      case _ => chargeFor match {
+        case Some(x) => x
+        case None => Timeslot(previousOccurred, new Date(event.occurredMillis))
+      }
     }
 
+    /*
+     * The following splits the chargable timeslot into smaller timeslots to
+     * comply with different applicability periods for algorithms and
+     * pricelists defined by the provided agreement.
+     */
     val chargeChunks = calcChangeChunks(agr, amount, dslResource, timeslot)
 
-    val timeReceived = System.currentTimeMillis
+    val timeReceived = TimeHelpers.nowMillis()
 
-    val rel = related.map{x => x.sourceEventIDs}.flatten ++ List(currentResourceEvent.id)
+    val rel = event.id :: related.map{x => x.sourceEventIDs}.flatten
 
     val entries = chargeChunks.map { c=>
         WalletEntry(
           id = CryptoUtils.sha1(c.id),
-          occurredMillis = currentResourceEvent.occurredMillis,
+          occurredMillis = event.occurredMillis,
           receivedMillis = timeReceived,
           sourceEventIDs = rel,
           value = c.cost,
           reason = c.reason,
-          userId = currentResourceEvent.userId,
-          resource = currentResourceEvent.resource,
-          instanceId = currentResourceEvent.instanceId,
+          userId = event.userID,
+          resource = event.resource,
+          instanceId = event.instanceID,
           finalized = isFinal
         )
     }
     Just(entries)
   }
 
+  /**
+   * Create a
+   */
   def calcChangeChunks(agr: DSLAgreement, volume: Double,
                        res: DSLResource, t: Timeslot): List[ChargeChunk] = {
 
@@ -434,10 +570,14 @@ trait Accounting extends DSLUtils with Loggable {
     }
   }
 
+  /**
+   * Get a list of charge chunks for discrete resources.
+   */
   private[logic]
   def calcChargeChunksDiscrete(algChunked: Map[Timeslot, DSLAlgorithm],
                                priChunked: Map[Timeslot, DSLPriceList],
                                volume: Double, res: DSLResource): List[ChargeChunk] = {
+    // In case of descrete resources, we only a expect a
     assert(algChunked.size == 1)
     assert(priChunked.size == 1)
     assert(algChunked.keySet.head.compare(priChunked.keySet.head) == 0)
@@ -448,6 +588,9 @@ trait Accounting extends DSLUtils with Loggable {
       algChunked.keySet.head, res))
   }
 
+  /**
+   * Get a list of charge chunks for continuous resources.
+   */
   private[logic]
   def calcChargeChunksContinuous(algChunked: Map[Timeslot, DSLAlgorithm],
                                  priChunked: Map[Timeslot, DSLPriceList],
@@ -512,16 +655,26 @@ trait Accounting extends DSLUtils with Loggable {
    */
   private[logic] def alignTimeslots(a: List[Timeslot],
                                     b: List[Timeslot]): List[Timeslot] = {
-    if (a.isEmpty) return b.tail
-    if (b.isEmpty) return a.tail
+
+    def safeTail(foo: List[Timeslot]) = foo match {
+      case Nil       => List()
+      case x :: Nil  => List()
+      case x :: rest => rest
+    }
+
+    if (a.isEmpty) return b
+    if (b.isEmpty) return a
+
     assert (a.head.from == b.head.from)
 
     if (a.head.endsAfter(b.head)) {
-      a.head.slice(b.head.to) ::: alignTimeslots(a.tail, b.tail)
+      val slice = a.head.slice(b.head.to)
+      slice.head :: alignTimeslots(slice.last :: a.tail, safeTail(b))
     } else if (b.head.endsAfter(a.head)) {
-      b.head.slice(a.head.to) ::: alignTimeslots(a.tail, b.tail)
+      val slice = b.head.slice(a.head.to)
+      slice.head :: alignTimeslots(safeTail(a), slice.last :: b.tail)
     } else {
-      a.head :: alignTimeslots(a.tail, b.tail)
+      a.head :: alignTimeslots(safeTail(a), safeTail(b))
     }
   }
 }
@@ -561,8 +714,8 @@ case class ChargeChunk(value: Double, algorithm: String,
 
   def id(): String =
     CryptoUtils.sha1("%f%s%f%s%s%d".format(value, algorithm, price, when.toString,
-      resource.name, System.currentTimeMillis()))
+      resource.name, TimeHelpers.nowMillis()))
 }
 
 /** An exception raised when something goes wrong with accounting */
-class AccountingException(msg: String) extends Exception(msg)
+class AccountingException(msg: String) extends AquariumException(msg)