f7152005e36113a849d672f716f3f26a450f93eb
[aquarium] / src / main / scala / gr / grnet / aquarium / logic / accounting / Accounting.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.logic.accounting
37
38 import gr.grnet.aquarium.util.shortClassNameOf
39 import algorithm.CostPolicyAlgorithmCompiler
40 import dsl._
41 import collection.immutable.SortedMap
42 import java.util.Date
43 import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just}
44 import gr.grnet.aquarium.util.{ContextualLogger, CryptoUtils, Loggable}
45 import gr.grnet.aquarium.store.PolicyStore
46 import gr.grnet.aquarium.AquariumException
47 import gr.grnet.aquarium.event.{WalletEntry}
48 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
49 import gr.grnet.aquarium.event.resource.ResourceEventModel
50
51 /**
52  * A timeslot together with the algorithm and unit price that apply for this particular timeslot.
53  *
54  * @param startMillis
55  * @param stopMillis
56  * @param algorithmDefinition
57  * @param unitPrice
58  * @param computedCredits The computed credits
59  */
60 case class Chargeslot(startMillis: Long,
61                       stopMillis: Long,
62                       algorithmDefinition: String,
63                       unitPrice: Double,
64                       computedCredits: Option[Double] = None) {
65
66   override def toString = "%s(%s, %s, %s, %s, %s)".format(
67     shortClassNameOf(this),
68     new MutableDateCalc(startMillis).toYYYYMMDDHHMMSSSSS,
69     new MutableDateCalc(stopMillis).toYYYYMMDDHHMMSSSSS,
70     unitPrice,
71     computedCredits,
72     algorithmDefinition
73   )
74 }
75
76 /**
77  * Methods for converting accounting events to wallet entries.
78  *
79  * @author Georgios Gousios <gousiosg@gmail.com>
80  * @author Christos KK Loverdos <loverdos@gmail.com>
81  */
82 trait Accounting extends DSLUtils with Loggable {
83   /**
84    * Breaks a reference timeslot (e.g. billing period) according to policies and agreements.
85    *
86    * @param referenceTimeslot
87    * @param policyTimeslots
88    * @param agreementTimeslots
89    * @return
90    */
91   protected
92   def splitTimeslotByPoliciesAndAgreements(referenceTimeslot: Timeslot,
93                                            policyTimeslots: List[Timeslot],
94                                            agreementTimeslots: List[Timeslot],
95                                            clogM: Maybe[ContextualLogger] = NoVal): List[Timeslot] = {
96
97 //    val clog = ContextualLogger.fromOther(clogM, logger, "splitTimeslotByPoliciesAndAgreements()")
98 //    clog.begin()
99
100     // Align policy and agreement validity timeslots to the referenceTimeslot
101     val alignedPolicyTimeslots    = referenceTimeslot.align(policyTimeslots)
102     val alignedAgreementTimeslots = referenceTimeslot.align(agreementTimeslots)
103
104 //    clog.debug("referenceTimeslot = %s", referenceTimeslot)
105 //    clog.debugSeq("alignedPolicyTimeslots", alignedPolicyTimeslots, 0)
106 //    clog.debugSeq("alignedAgreementTimeslots", alignedAgreementTimeslots, 0)
107
108     val result = alignTimeslots(alignedPolicyTimeslots, alignedAgreementTimeslots)
109 //    clog.debugSeq("result", result, 1)
110 //    clog.end()
111     result
112   }
113
114   /**
115    * Given a reference timeslot, we have to break it up to a series of timeslots where a particular
116    * algorithm and price unit is in effect.
117    *
118    */
119   protected
120   def resolveEffectiveAlgorithmsAndPriceLists(alignedTimeslot: Timeslot,
121                                               agreement: DSLAgreement,
122                                               clogM: Maybe[ContextualLogger] = NoVal): (Map[Timeslot, DSLAlgorithm], Map[Timeslot, DSLPriceList]) = {
123
124     val clog = ContextualLogger.fromOther(clogM, logger, "resolveEffectiveAlgorithmsAndPriceLists()")
125
126     // Note that most of the code is taken from calcChangeChunks()
127     val alg = resolveEffectiveAlgorithmsForTimeslot(alignedTimeslot, agreement)
128     val pri = resolveEffectivePricelistsForTimeslot(alignedTimeslot, agreement)
129     val chargeChunks = splitChargeChunks(alg, pri)
130     val algorithmByTimeslot = chargeChunks._1
131     val pricelistByTimeslot = chargeChunks._2
132
133     assert(algorithmByTimeslot.size == pricelistByTimeslot.size)
134
135     (algorithmByTimeslot, pricelistByTimeslot)
136   }
137
138   protected
139   def computeInitialChargeslots(referenceTimeslot: Timeslot,
140                                 dslResource: DSLResource,
141                                 policiesByTimeslot: Map[Timeslot, DSLPolicy],
142                                 agreementNamesByTimeslot: Map[Timeslot, String],
143                                 contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[List[Chargeslot]] = Maybe {
144
145     val clog = ContextualLogger.fromOther(contextualLogger, logger, "computeInitialChargeslots()")
146 //    clog.begin()
147
148     val policyTimeslots = policiesByTimeslot.keySet
149     val agreementTimeslots = agreementNamesByTimeslot.keySet
150
151 //    clog.debugMap("policiesByTimeslot", policiesByTimeslot, 1)
152 //    clog.debugMap("agreementNamesByTimeslot", agreementNamesByTimeslot, 1)
153
154     def getPolicy(ts: Timeslot): DSLPolicy = {
155       policiesByTimeslot.find(_._1.contains(ts)).get._2
156     }
157     def getAgreementName(ts: Timeslot): String = {
158       agreementNamesByTimeslot.find(_._1.contains(ts)).get._2
159     }
160
161     // 1. Round ONE: split time according to overlapping policies and agreements.
162 //    clog.begin("ROUND 1")
163     val alignedTimeslots = splitTimeslotByPoliciesAndAgreements(referenceTimeslot, policyTimeslots.toList, agreementTimeslots.toList, Just(clog))
164 //    clog.debugSeq("alignedTimeslots", alignedTimeslots, 1)
165 //    clog.end("ROUND 1")
166
167     // 2. Round TWO: Use the aligned timeslots of Round ONE to produce even more
168     //    fine-grained timeslots according to applicable algorithms.
169     //    Then pack the info into charge slots.
170 //    clog.begin("ROUND 2")
171     val allChargeslots = for {
172       alignedTimeslot <- alignedTimeslots
173     } yield {
174 //      val alignedTimeslotMsg = "alignedTimeslot = %s".format(alignedTimeslot)
175 //      clog.begin(alignedTimeslotMsg)
176
177       val dslPolicy = getPolicy(alignedTimeslot)
178 //      clog.debug("dslPolicy = %s", dslPolicy)
179       val agreementName = getAgreementName(alignedTimeslot)
180 //      clog.debug("agreementName = %s", agreementName)
181       val agreementOpt = dslPolicy.findAgreement(agreementName)
182 //      clog.debug("agreementOpt = %s", agreementOpt)
183
184       agreementOpt match {
185         case None ⇒
186           val errMsg = "Unknown agreement %s during %s".format(agreementName, alignedTimeslot)
187           clog.error("%s", errMsg)
188           throw new AquariumException(errMsg)
189
190         case Some(agreement) ⇒
191           // TODO: Factor this out, just like we did with:
192           // TODO:  val alignedTimeslots = splitTimeslotByPoliciesAndAgreements
193           // Note that most of the code is already taken from calcChangeChunks()
194           val r = resolveEffectiveAlgorithmsAndPriceLists(alignedTimeslot, agreement, Just(clog))
195           val algorithmByTimeslot: Map[Timeslot, DSLAlgorithm] = r._1
196           val pricelistByTimeslot: Map[Timeslot, DSLPriceList] = r._2
197
198           // Now, the timeslots must be the same
199           val finegrainedTimeslots = algorithmByTimeslot.keySet
200
201           val chargeslots = for {
202             finegrainedTimeslot <- finegrainedTimeslots
203           } yield {
204 //            val finegrainedTimeslotMsg = "finegrainedTimeslot = %s".format(finegrainedTimeslot)
205 //            clog.begin(finegrainedTimeslotMsg)
206
207             val dslAlgorithm = algorithmByTimeslot(finegrainedTimeslot) // TODO: is this correct?
208 //            clog.debug("dslAlgorithm = %s", dslAlgorithm)
209 //            clog.debugMap("dslAlgorithm.algorithms", dslAlgorithm.algorithms, 1)
210             val dslPricelist = pricelistByTimeslot(finegrainedTimeslot) // TODO: is this correct?
211 //            clog.debug("dslPricelist = %s", dslPricelist)
212 //            clog.debug("dslResource = %s", dslResource)
213             val algorithmDefOpt = dslAlgorithm.algorithms.get(dslResource)
214 //            clog.debug("algorithmDefOpt = %s", algorithmDefOpt)
215             val priceUnitOpt = dslPricelist.prices.get(dslResource)
216 //            clog.debug("priceUnitOpt = %s", priceUnitOpt)
217
218             val chargeslot = (algorithmDefOpt, priceUnitOpt) match {
219               case (None, None) ⇒
220                 throw new AquariumException(
221                   "Unknown algorithm and price unit for resource %s during %s".
222                     format(dslResource, finegrainedTimeslot))
223               case (None, _) ⇒
224                 throw new AquariumException(
225                   "Unknown algorithm for resource %s during %s".
226                     format(dslResource, finegrainedTimeslot))
227               case (_, None) ⇒
228                 throw new AquariumException(
229                   "Unknown price unit for resource %s during %s".
230                     format(dslResource, finegrainedTimeslot))
231               case (Some(algorithmDefinition), Some(priceUnit)) ⇒
232                 Chargeslot(finegrainedTimeslot.from.getTime, finegrainedTimeslot.to.getTime, algorithmDefinition, priceUnit)
233             }
234
235 //            clog.end(finegrainedTimeslotMsg)
236             chargeslot
237           }
238
239 //          clog.end(alignedTimeslotMsg)
240           chargeslots.toList
241       }
242     }
243 //    clog.end("ROUND 2")
244
245
246     val result = allChargeslots.flatten
247 //    clog.debugSeq("result", allChargeslots, 1)
248 //    clog.end()
249     result
250   }
251
252   /**
253    * Compute the charge slots generated by a particular resource event.
254    *
255    */
256   def computeFullChargeslots(previousResourceEventM: Maybe[ResourceEventModel],
257                              currentResourceEvent: ResourceEventModel,
258                              oldCredits: Double,
259                              oldTotalAmount: Double,
260                              newTotalAmount: Double,
261                              dslResource: DSLResource,
262                              defaultResourceMap: DSLResourcesMap,
263                              agreementNamesByTimeslot: Map[Timeslot, String],
264                              algorithmCompiler: CostPolicyAlgorithmCompiler,
265                              policyStore: PolicyStore,
266                              contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[(Timeslot, List[Chargeslot])] = Maybe {
267
268     val clog = ContextualLogger.fromOther(contextualLogger, logger, "computeFullChargeslots()")
269 //    clog.begin()
270
271     val occurredDate = currentResourceEvent.occurredDate
272     val occurredMillis = currentResourceEvent.occurredMillis
273     val costPolicy = dslResource.costPolicy
274
275     val dsl = new DSL{}
276     val (referenceTimeslot, relevantPolicies, previousValue) = costPolicy.needsPreviousEventForCreditAndAmountCalculation match {
277       // We need a previous event
278       case true ⇒
279         previousResourceEventM match {
280           // We have a previous event
281           case Just(previousResourceEvent) ⇒
282 //            clog.debug("Have previous event")
283 //            clog.debug("previousValue = %s", previousResourceEvent.value)
284
285             val referenceTimeslot = Timeslot(previousResourceEvent.occurredDate, occurredDate)
286 //            clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
287
288             // all policies within the interval from previous to current resource event
289 //            clog.debug("Calling policyStore.loadAndSortPoliciesWithin(%s)", referenceTimeslot)
290             val relevantPolicies = policyStore.loadAndSortPoliciesWithin(referenceTimeslot.from.getTime, referenceTimeslot.to.getTime, dsl)
291 //            clog.debugMap("==> relevantPolicies", relevantPolicies, 0)
292
293             (referenceTimeslot, relevantPolicies, previousResourceEvent.value)
294
295           // We do not have a previous event
296           case NoVal ⇒
297             throw new AquariumException(
298               "Unable to charge. No previous event given for %s".
299                 format(currentResourceEvent.toDebugString()))
300
301           // We could not obtain a previous event
302           case failed @ Failed(e) ⇒
303             throw new AquariumException(
304               "Unable to charge. Could not obtain previous event for %s".
305                 format(currentResourceEvent.toDebugString()), e)
306         }
307
308       // We do not need a previous event
309       case false ⇒
310         // ... so we cannot compute timedelta from a previous event, there is just one chargeslot
311         // referring to (almost) an instant in time
312 //        clog.debug("DO NOT have previous event")
313         val previousValue = costPolicy.getResourceInstanceUndefinedAmount
314 //        clog.debug("previousValue = costPolicy.getResourceInstanceUndefinedAmount = %s", previousValue)
315
316         val referenceTimeslot = Timeslot(new MutableDateCalc(occurredDate).goPreviousMilli.toDate, occurredDate)
317 //        clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
318
319 //        clog.debug("Calling policyStore.loadValidPolicyEntryAt(%s)", new MutableDateCalc(occurredMillis))
320         val relevantPolicyM = policyStore.loadValidPolicyAt(occurredMillis, dsl)
321 //        clog.debug("  ==> relevantPolicyM = %s", relevantPolicyM)
322
323         val relevantPolicies = relevantPolicyM match {
324           case Just(relevantPolicy) ⇒
325             Map(referenceTimeslot -> relevantPolicy)
326           case NoVal ⇒
327             throw new AquariumException("No relevant policy found for %s".format(referenceTimeslot))
328           case failed @ Failed(e) ⇒
329             throw new AquariumException("No relevant policy found for %s".format(referenceTimeslot), e)
330
331         }
332
333         (referenceTimeslot, relevantPolicies, previousValue)
334     }
335
336     val initialChargeslotsM = computeInitialChargeslots(
337       referenceTimeslot,
338       dslResource,
339       relevantPolicies,
340       agreementNamesByTimeslot,
341       Just(clog)
342     )
343
344     val fullChargeslotsM = initialChargeslotsM.map { chargeslots ⇒
345       chargeslots.map {
346         case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒
347           val execAlgorithmM = algorithmCompiler.compile(algorithmDefinition)
348           execAlgorithmM match {
349             case NoVal ⇒
350               throw new AquariumException("Could not compile algorithm %s".format(algorithmDefinition))
351
352             case failed @ Failed(e) ⇒
353               failed.throwMe
354
355             case Just(execAlgorithm) ⇒
356               val valueMap = costPolicy.makeValueMap(
357                 oldCredits,
358                 oldTotalAmount,
359                 newTotalAmount,
360                 stopMillis - startMillis,
361                 previousValue,
362                 currentResourceEvent.value,
363                 unitPrice
364               )
365
366 //              clog.debug("execAlgorithm = %s", execAlgorithm)
367               clog.debugMap("valueMap", valueMap, 1)
368
369               // This is it
370               val creditsM = execAlgorithm.apply(valueMap)
371
372               creditsM match {
373                 case NoVal ⇒
374                   throw new AquariumException(
375                     "Could not compute credits for resource %s during %s".
376                       format(dslResource.name, Timeslot(new Date(startMillis), new Date(stopMillis))))
377
378                 case failed @ Failed(e) ⇒
379                   failed.throwMe
380
381                 case Just(credits) ⇒
382                   chargeslot.copy(computedCredits = Some(credits))
383               }
384           }
385       }
386     }
387
388     val result = fullChargeslotsM match {
389       case Just(fullChargeslots) ⇒
390         referenceTimeslot -> fullChargeslots
391       case NoVal ⇒
392         null
393       case failed @ Failed(e) ⇒
394         failed.throwMe
395     }
396
397 //    clog.end()
398
399     result
400   }
401
402   /**
403    * Create a list of wallet entries by charging for a resource event.
404    *
405    * @param currentResourceEvent The resource event to create charges for
406    * @param agreements The user's agreement names, indexed by their
407    *                   applicability timeslot
408    * @param previousAmount The current state of the resource
409    * @param previousOccurred The last time the resource state was updated
410    */
411   def chargeEvent(currentResourceEvent: ResourceEventModel,
412                   agreements: SortedMap[Timeslot, String],
413                   previousAmount: Double,
414                   previousOccurred: Date,
415                   related: List[WalletEntry]): Maybe[List[WalletEntry]] = {
416
417     assert(previousOccurred.getTime <= currentResourceEvent.occurredMillis)
418     val occuredDate = new Date(currentResourceEvent.occurredMillis)
419
420     /* The following makes sure that agreements exist between the start
421      * and end days of the processed event. As agreement updates are
422      * guaranteed not to leave gaps, this means that the event can be
423      * processed correctly, as at least one agreement will be valid
424      * throughout the event's life.
425      */
426     assert(
427       agreements.keysIterator.exists {
428         p => p.includes(occuredDate)
429       } && agreements.keysIterator.exists {
430         p => p.includes(previousOccurred)
431       }
432     )
433
434     val t = Timeslot(previousOccurred, occuredDate)
435
436     // Align policy and agreement validity timeslots to the event's boundaries
437     val policyTimeslots = t.align(
438       Policy.policies(previousOccurred, occuredDate).keysIterator.toList)
439     val agreementTimeslots = t.align(agreements.keysIterator.toList)
440
441     /*
442      * Get a set of timeslot slices covering the different durations of
443      * agreements and policies.
444      */
445     val aligned = alignTimeslots(policyTimeslots, agreementTimeslots)
446
447     val walletEntries = aligned.map {
448       x =>
449         // Retrieve agreement from the policy valid at time of event
450         val agreementName = agreements.find(y => y._1.contains(x)) match {
451           case Some(x) => x
452           case None => return Failed(new AccountingException(("Cannot find" +
453             " user agreement for period %s").format(x)))
454         }
455
456         // Do the wallet entry calculation
457         val entries = chargeEvent(
458           currentResourceEvent,
459           Policy.policy(x.from).findAgreement(agreementName._2).getOrElse(
460             return Failed(new AccountingException("Cannot get agreement for %s".format()))
461           ),
462           previousAmount,
463           previousOccurred,
464           related,
465           Some(x)
466         ) match {
467           case Just(x) => x
468           case Failed(f) => return Failed(f)
469           case NoVal => List()
470         }
471         entries
472     }.flatten
473
474     Just(walletEntries)
475   }
476
477   /**
478    * Creates a list of wallet entries by applying the agreement provisions on
479    * the resource state.
480    *
481    * @param event The resource event to create charges for
482    * @param agr The agreement implementation to use
483    * @param previousAmount The current state of the resource
484    * @param previousOccurred The timestamp of the previous event
485    * @param related Related wallet entries (TODO: should remove)
486    * @param chargeFor The duration for which the charge should be done.
487    *                  Should fall between the previous and current
488    *                  resource event boundaries
489    * @return A list of wallet entries, one for each
490    */
491   def chargeEvent(event: ResourceEventModel,
492                   agr: DSLAgreement,
493                   previousAmount: Double,
494                   previousOccurred: Date,
495                   related: List[WalletEntry],
496                   chargeFor: Option[Timeslot]): Maybe[List[WalletEntry]] = {
497
498     // If chargeFor is not null, make sure it falls within
499     // event time boundaries
500     chargeFor.map{x => assert(true,
501       Timeslot(previousOccurred, new Date(event.occurredMillis)))}
502
503 //    if (!event.validate())
504 //      return Failed(new AccountingException("Event not valid"))
505
506     val policy = Policy.policy
507     val dslResource = policy.findResource(event.resource) match {
508       case Some(x) => x
509       case None => return Failed(
510         new AccountingException("No resource [%s]".format(event.resource)))
511     }
512
513     /* This is a safeguard against the special case where the last
514      * resource state update, as marked by the lastUpdate parameter
515      * is equal to the time of the event occurrence. This means that
516      * this is the first time the resource state has been recorded.
517      * Charging in this case only makes sense for discrete resources.
518      */
519     if (previousOccurred.getTime == event.occurredMillis) {
520       dslResource.costPolicy match {
521         case DiscreteCostPolicy => //Ok
522         case _ => return Some(List())
523       }
524     }
525
526     val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(Just(previousAmount), event.value)
527     val amount = creditCalculationValueM match {
528       case failed @ Failed(_) ⇒
529         return failed
530       case Just(amount) ⇒
531         amount
532       case NoVal ⇒
533         0.0
534     }
535
536     // We don't do strict checking for all cases for OnOffPolicies as
537     // above, since this point won't be reached in case of error.
538     val isFinal = dslResource.costPolicy match {
539       case OnOffCostPolicy =>
540         OnOffPolicyResourceState(previousAmount) match {
541           case OnResourceState => false
542           case OffResourceState => true
543         }
544       case _ => true
545     }
546
547     /*
548      * Get the timeslot for which this event will be charged. In case we
549      * have a discrete resource, we do not really care for the time duration
550      * of an event. To process all events in a uniform way, we create an
551      * artificial timeslot lasting the minimum amount of time. In all other
552      * cases, we first check whether a desired charge period passed as
553      * an argument.
554      */
555     val timeslot = dslResource.costPolicy match {
556       case DiscreteCostPolicy => Timeslot(new Date(event.occurredMillis - 1),
557         new Date(event.occurredMillis))
558       case _ => chargeFor match {
559         case Some(x) => x
560         case None => Timeslot(previousOccurred, new Date(event.occurredMillis))
561       }
562     }
563
564     /*
565      * The following splits the chargable timeslot into smaller timeslots to
566      * comply with different applicability periods for algorithms and
567      * pricelists defined by the provided agreement.
568      */
569     val chargeChunks = calcChangeChunks(agr, amount, dslResource, timeslot)
570
571     val timeReceived = TimeHelpers.nowMillis()
572
573     val rel = event.id :: related.map{x => x.sourceEventIDs}.flatten
574
575     val entries = chargeChunks.map { c=>
576         WalletEntry(
577           id = CryptoUtils.sha1(c.id),
578           occurredMillis = event.occurredMillis,
579           receivedMillis = timeReceived,
580           sourceEventIDs = rel,
581           value = c.cost,
582           reason = c.reason,
583           userId = event.userID,
584           resource = event.resource,
585           instanceId = event.instanceID,
586           finalized = isFinal
587         )
588     }
589     Just(entries)
590   }
591
592   /**
593    * Create a
594    */
595   def calcChangeChunks(agr: DSLAgreement, volume: Double,
596                        res: DSLResource, t: Timeslot): List[ChargeChunk] = {
597
598     val alg = resolveEffectiveAlgorithmsForTimeslot(t, agr)
599     val pri = resolveEffectivePricelistsForTimeslot(t, agr)
600     val chunks = splitChargeChunks(alg, pri)
601     val algChunked = chunks._1
602     val priChunked = chunks._2
603
604     assert(algChunked.size == priChunked.size)
605
606     res.costPolicy match {
607       case DiscreteCostPolicy => calcChargeChunksDiscrete(algChunked, priChunked, volume, res)
608       case _ => calcChargeChunksContinuous(algChunked, priChunked, volume, res)
609     }
610   }
611
612   /**
613    * Get a list of charge chunks for discrete resources.
614    */
615   private[logic]
616   def calcChargeChunksDiscrete(algChunked: Map[Timeslot, DSLAlgorithm],
617                                priChunked: Map[Timeslot, DSLPriceList],
618                                volume: Double, res: DSLResource): List[ChargeChunk] = {
619     // In case of descrete resources, we only a expect a
620     assert(algChunked.size == 1)
621     assert(priChunked.size == 1)
622     assert(algChunked.keySet.head.compare(priChunked.keySet.head) == 0)
623
624     List(ChargeChunk(volume,
625       algChunked.valuesIterator.next.algorithms.getOrElse(res, ""),
626       priChunked.valuesIterator.next.prices.getOrElse(res, 0),
627       algChunked.keySet.head, res))
628   }
629
630   /**
631    * Get a list of charge chunks for continuous resources.
632    */
633   private[logic]
634   def calcChargeChunksContinuous(algChunked: Map[Timeslot, DSLAlgorithm],
635                                  priChunked: Map[Timeslot, DSLPriceList],
636                                  volume: Double, res: DSLResource): List[ChargeChunk] = {
637     algChunked.keysIterator.map {
638       x =>
639         ChargeChunk(volume,
640           algChunked.get(x).get.algorithms.getOrElse(res, ""),
641           priChunked.get(x).get.prices.getOrElse(res, 0), x, res)
642     }.toList
643   }
644
645   /**
646    * Align charge timeslots between algorithms and pricelists. As algorithm
647    * and pricelists can have different effectivity periods, this method
648    * examines them and splits them as necessary.
649    */
650   private[logic] def splitChargeChunks(alg: SortedMap[Timeslot, DSLAlgorithm],
651                                        price: SortedMap[Timeslot, DSLPriceList]) :
652     (Map[Timeslot, DSLAlgorithm], Map[Timeslot, DSLPriceList]) = {
653
654     val zipped = alg.keySet.zip(price.keySet)
655
656     zipped.find(p => !p._1.equals(p._2)) match {
657       case None => (alg, price)
658       case Some(x) =>
659         val algTimeslot = x._1
660         val priTimeslot = x._2
661
662         assert(algTimeslot.from == priTimeslot.from)
663
664         if (algTimeslot.endsAfter(priTimeslot)) {
665           val slices = algTimeslot.slice(priTimeslot.to)
666           val algo = alg.get(algTimeslot).get
667           val newalg = alg - algTimeslot ++ Map(slices.apply(0) -> algo) ++ Map(slices.apply(1) -> algo)
668           splitChargeChunks(newalg, price)
669         }
670         else {
671           val slices = priTimeslot.slice(priTimeslot.to)
672           val pl = price.get(priTimeslot).get
673           val newPrice = price - priTimeslot ++ Map(slices.apply(0) -> pl) ++ Map(slices.apply(1) -> pl)
674           splitChargeChunks(alg, newPrice)
675         }
676     }
677   }
678
679   /**
680    * Given two lists of timeslots, produce a list which contains the
681    * set of timeslot slices, as those are defined by
682    * timeslot overlaps.
683    *
684    * For example, given the timeslots a and b below, split them as shown.
685    *
686    * a = |****************|
687    *     ^                ^
688    *   a.from            a.to
689    * b = |*********|
690    *     ^         ^
691    *   b.from     b.to
692    *
693    * result: List(Timeslot(a.from, b.to), Timeslot(b.to, a.to))
694    */
695   private[logic] def alignTimeslots(a: List[Timeslot],
696                                     b: List[Timeslot]): List[Timeslot] = {
697
698     def safeTail(foo: List[Timeslot]) = foo match {
699       case Nil       => List()
700       case x :: Nil  => List()
701       case x :: rest => rest
702     }
703
704     if (a.isEmpty) return b
705     if (b.isEmpty) return a
706
707     assert (a.head.from == b.head.from)
708
709     if (a.head.endsAfter(b.head)) {
710       val slice = a.head.slice(b.head.to)
711       slice.head :: alignTimeslots(slice.last :: a.tail, safeTail(b))
712     } else if (b.head.endsAfter(a.head)) {
713       val slice = b.head.slice(a.head.to)
714       slice.head :: alignTimeslots(safeTail(a), slice.last :: b.tail)
715     } else {
716       a.head :: alignTimeslots(safeTail(a), safeTail(b))
717     }
718   }
719 }
720
721 /**
722  * Encapsulates a computation for a specific timeslot of
723  * resource usage.
724  */
725 case class ChargeChunk(value: Double, algorithm: String,
726                        price: Double, when: Timeslot,
727                        resource: DSLResource) {
728   assert(value > 0)
729   assert(!algorithm.isEmpty)
730   assert(resource != null)
731
732   def cost(): Double =
733     //TODO: Apply the algorithm, when we start parsing it
734     resource.costPolicy match {
735       case DiscreteCostPolicy =>
736         value * price
737       case _ =>
738         value * price * when.hours
739     }
740
741   def reason(): String =
742     resource.costPolicy match {
743       case DiscreteCostPolicy =>
744         "%f %s at %s @ %f/%s".format(value, resource.unit, when.from, price,
745           resource.unit)
746       case ContinuousCostPolicy =>
747         "%f %s of %s from %s to %s @ %f/%s".format(value, resource.unit,
748           resource.name, when.from, when.to, price, resource.unit)
749       case OnOffCostPolicy =>
750         "%f %s of %s from %s to %s @ %f/%s".format(when.hours, resource.unit,
751           resource.name, when.from, when.to, price, resource.unit)
752     }
753
754   def id(): String =
755     CryptoUtils.sha1("%f%s%f%s%s%d".format(value, algorithm, price, when.toString,
756       resource.name, TimeHelpers.nowMillis()))
757 }
758
759 /** An exception raised when something goes wrong with accounting */
760 class AccountingException(msg: String) extends AquariumException(msg)