Copyright
[aquarium] / src / main / scala / gr / grnet / aquarium / logic / accounting / Accounting.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.logic.accounting
37
38 import gr.grnet.aquarium.util.shortClassNameOf
39 import algorithm.CostPolicyAlgorithmCompiler
40 import dsl._
41 import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent}
42 import collection.immutable.SortedMap
43 import java.util.Date
44 import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just}
45 import gr.grnet.aquarium.util.date.MutableDateCalc
46 import gr.grnet.aquarium.util.{ContextualLogger, CryptoUtils, Loggable}
47 import gr.grnet.aquarium.store.PolicyStore
48
49 /**
50  * A timeslot together with the algorithm and unit price that apply for this particular timeslot.
51  *
52  * @param startMillis
53  * @param stopMillis
54  * @param algorithmDefinition
55  * @param unitPrice
56  * @param computedCredits The computed credits
57  */
58 case class Chargeslot(startMillis: Long,
59                       stopMillis: Long,
60                       algorithmDefinition: String,
61                       unitPrice: Double,
62                       computedCredits: Option[Double] = None) {
63
64   override def toString = "%s(%s, %s, %s, %s, %s)".format(
65     shortClassNameOf(this),
66     new MutableDateCalc(startMillis).toYYYYMMDDHHMMSSSSS,
67     new MutableDateCalc(stopMillis).toYYYYMMDDHHMMSSSSS,
68     unitPrice,
69     computedCredits,
70     algorithmDefinition
71   )
72 }
73
74 /**
75  * Methods for converting accounting events to wallet entries.
76  *
77  * @author Georgios Gousios <gousiosg@gmail.com>
78  * @author Christos KK Loverdos <loverdos@gmail.com>
79  */
80 trait Accounting extends DSLUtils with Loggable {
81   /**
82    * Breaks a reference timeslot (e.g. billing period) according to policies and agreements.
83    *
84    * @param referenceTimeslot
85    * @param policyTimeslots
86    * @param agreementTimeslots
87    * @return
88    */
89   protected
90   def splitTimeslotByPoliciesAndAgreements(referenceTimeslot: Timeslot,
91                                            policyTimeslots: List[Timeslot],
92                                            agreementTimeslots: List[Timeslot],
93                                            clogM: Maybe[ContextualLogger] = NoVal): List[Timeslot] = {
94
95 //    val clog = ContextualLogger.fromOther(clogM, logger, "splitTimeslotByPoliciesAndAgreements()")
96 //    clog.begin()
97
98     // Align policy and agreement validity timeslots to the referenceTimeslot
99     val alignedPolicyTimeslots    = referenceTimeslot.align(policyTimeslots)
100     val alignedAgreementTimeslots = referenceTimeslot.align(agreementTimeslots)
101
102 //    clog.debug("referenceTimeslot = %s", referenceTimeslot)
103 //    clog.debugSeq("alignedPolicyTimeslots", alignedPolicyTimeslots, 0)
104 //    clog.debugSeq("alignedAgreementTimeslots", alignedAgreementTimeslots, 0)
105
106     val result = alignTimeslots(alignedPolicyTimeslots, alignedAgreementTimeslots)
107 //    clog.debugSeq("result", result, 1)
108 //    clog.end()
109     result
110   }
111
112   /**
113    * Given a reference timeslot, we have to break it up to a series of timeslots where a particular
114    * algorithm and price unit is in effect.
115    *
116    */
117   protected
118   def resolveEffectiveAlgorithmsAndPriceLists(alignedTimeslot: Timeslot,
119                                               agreement: DSLAgreement,
120                                               clogM: Maybe[ContextualLogger] = NoVal): (Map[Timeslot, DSLAlgorithm], Map[Timeslot, DSLPriceList]) = {
121
122     val clog = ContextualLogger.fromOther(clogM, logger, "resolveEffectiveAlgorithmsAndPriceLists()")
123
124     // Note that most of the code is taken from calcChangeChunks()
125     val alg = resolveEffectiveAlgorithmsForTimeslot(alignedTimeslot, agreement)
126     val pri = resolveEffectivePricelistsForTimeslot(alignedTimeslot, agreement)
127     val chargeChunks = splitChargeChunks(alg, pri)
128     val algorithmByTimeslot = chargeChunks._1
129     val pricelistByTimeslot = chargeChunks._2
130
131     assert(algorithmByTimeslot.size == pricelistByTimeslot.size)
132
133     (algorithmByTimeslot, pricelistByTimeslot)
134   }
135
136   protected
137   def computeInitialChargeslots(referenceTimeslot: Timeslot,
138                                 dslResource: DSLResource,
139                                 policiesByTimeslot: Map[Timeslot, DSLPolicy],
140                                 agreementNamesByTimeslot: Map[Timeslot, String],
141                                 contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[List[Chargeslot]] = Maybe {
142
143     val clog = ContextualLogger.fromOther(contextualLogger, logger, "computeInitialChargeslots()")
144 //    clog.begin()
145
146     val policyTimeslots = policiesByTimeslot.keySet
147     val agreementTimeslots = agreementNamesByTimeslot.keySet
148
149 //    clog.debugMap("policiesByTimeslot", policiesByTimeslot, 1)
150 //    clog.debugMap("agreementNamesByTimeslot", agreementNamesByTimeslot, 1)
151
152     def getPolicy(ts: Timeslot): DSLPolicy = {
153       policiesByTimeslot.find(_._1.contains(ts)).get._2
154     }
155     def getAgreementName(ts: Timeslot): String = {
156       agreementNamesByTimeslot.find(_._1.contains(ts)).get._2
157     }
158
159     // 1. Round ONE: split time according to overlapping policies and agreements.
160 //    clog.begin("ROUND 1")
161     val alignedTimeslots = splitTimeslotByPoliciesAndAgreements(referenceTimeslot, policyTimeslots.toList, agreementTimeslots.toList, Just(clog))
162 //    clog.debugSeq("alignedTimeslots", alignedTimeslots, 1)
163 //    clog.end("ROUND 1")
164
165     // 2. Round TWO: Use the aligned timeslots of Round ONE to produce even more
166     //    fine-grained timeslots according to applicable algorithms.
167     //    Then pack the info into charge slots.
168 //    clog.begin("ROUND 2")
169     val allChargeslots = for {
170       alignedTimeslot <- alignedTimeslots
171     } yield {
172 //      val alignedTimeslotMsg = "alignedTimeslot = %s".format(alignedTimeslot)
173 //      clog.begin(alignedTimeslotMsg)
174
175       val dslPolicy = getPolicy(alignedTimeslot)
176 //      clog.debug("dslPolicy = %s", dslPolicy)
177       val agreementName = getAgreementName(alignedTimeslot)
178 //      clog.debug("agreementName = %s", agreementName)
179       val agreementOpt = dslPolicy.findAgreement(agreementName)
180 //      clog.debug("agreementOpt = %s", agreementOpt)
181
182       agreementOpt match {
183         case None ⇒
184           val errMsg = "Unknown agreement %s during %s".format(agreementName, alignedTimeslot)
185           clog.error("%s", errMsg)
186           throw new Exception(errMsg)
187
188         case Some(agreement) ⇒
189           // TODO: Factor this out, just like we did with:
190           // TODO:  val alignedTimeslots = splitTimeslotByPoliciesAndAgreements
191           // Note that most of the code is already taken from calcChangeChunks()
192           val r = resolveEffectiveAlgorithmsAndPriceLists(alignedTimeslot, agreement, Just(clog))
193           val algorithmByTimeslot: Map[Timeslot, DSLAlgorithm] = r._1
194           val pricelistByTimeslot: Map[Timeslot, DSLPriceList] = r._2
195
196           // Now, the timeslots must be the same
197           val finegrainedTimeslots = algorithmByTimeslot.keySet
198
199           val chargeslots = for {
200             finegrainedTimeslot <- finegrainedTimeslots
201           } yield {
202 //            val finegrainedTimeslotMsg = "finegrainedTimeslot = %s".format(finegrainedTimeslot)
203 //            clog.begin(finegrainedTimeslotMsg)
204
205             val dslAlgorithm = algorithmByTimeslot(finegrainedTimeslot) // TODO: is this correct?
206 //            clog.debug("dslAlgorithm = %s", dslAlgorithm)
207 //            clog.debugMap("dslAlgorithm.algorithms", dslAlgorithm.algorithms, 1)
208             val dslPricelist = pricelistByTimeslot(finegrainedTimeslot) // TODO: is this correct?
209 //            clog.debug("dslPricelist = %s", dslPricelist)
210 //            clog.debug("dslResource = %s", dslResource)
211             val algorithmDefOpt = dslAlgorithm.algorithms.get(dslResource)
212 //            clog.debug("algorithmDefOpt = %s", algorithmDefOpt)
213             val priceUnitOpt = dslPricelist.prices.get(dslResource)
214 //            clog.debug("priceUnitOpt = %s", priceUnitOpt)
215
216             val chargeslot = (algorithmDefOpt, priceUnitOpt) match {
217               case (None, None) ⇒
218                 throw new Exception(
219                   "Unknown algorithm and price unit for resource %s during %s".
220                     format(dslResource, finegrainedTimeslot))
221               case (None, _) ⇒
222                 throw new Exception(
223                   "Unknown algorithm for resource %s during %s".
224                     format(dslResource, finegrainedTimeslot))
225               case (_, None) ⇒
226                 throw new Exception(
227                   "Unknown price unit for resource %s during %s".
228                     format(dslResource, finegrainedTimeslot))
229               case (Some(algorithmDefinition), Some(priceUnit)) ⇒
230                 Chargeslot(finegrainedTimeslot.from.getTime, finegrainedTimeslot.to.getTime, algorithmDefinition, priceUnit)
231             }
232
233 //            clog.end(finegrainedTimeslotMsg)
234             chargeslot
235           }
236
237 //          clog.end(alignedTimeslotMsg)
238           chargeslots.toList
239       }
240     }
241 //    clog.end("ROUND 2")
242
243
244     val result = allChargeslots.flatten
245 //    clog.debugSeq("result", allChargeslots, 1)
246 //    clog.end()
247     result
248   }
249
250   /**
251    * Compute the charge slots generated by a particular resource event.
252    *
253    */
254   def computeFullChargeslots(previousResourceEventM: Maybe[ResourceEvent],
255                              currentResourceEvent: ResourceEvent,
256                              oldCredits: Double,
257                              oldTotalAmount: Double,
258                              newTotalAmount: Double,
259                              dslResource: DSLResource,
260                              defaultResourceMap: DSLResourcesMap,
261                              agreementNamesByTimeslot: Map[Timeslot, String],
262                              algorithmCompiler: CostPolicyAlgorithmCompiler,
263                              policyStore: PolicyStore,
264                              contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[(Timeslot, List[Chargeslot])] = Maybe {
265
266     val clog = ContextualLogger.fromOther(contextualLogger, logger, "computeFullChargeslots()")
267 //    clog.begin()
268
269     val occurredDate = currentResourceEvent.occurredDate
270     val occurredMillis = currentResourceEvent.occurredMillis
271     val costPolicy = dslResource.costPolicy
272
273     val dsl = new DSL{}
274     val (referenceTimeslot, relevantPolicies, previousValue) = costPolicy.needsPreviousEventForCreditAndAmountCalculation match {
275       // We need a previous event
276       case true ⇒
277         previousResourceEventM match {
278           // We have a previous event
279           case Just(previousResourceEvent) ⇒
280 //            clog.debug("Have previous event")
281 //            clog.debug("previousValue = %s", previousResourceEvent.value)
282
283             val referenceTimeslot = Timeslot(previousResourceEvent.occurredDate, occurredDate)
284 //            clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
285
286             // all policies within the interval from previous to current resource event
287 //            clog.debug("Calling policyStore.loadAndSortPoliciesWithin(%s)", referenceTimeslot)
288             val relevantPolicies = policyStore.loadAndSortPoliciesWithin(referenceTimeslot.from.getTime, referenceTimeslot.to.getTime, dsl)
289 //            clog.debugMap("==> relevantPolicies", relevantPolicies, 0)
290
291             (referenceTimeslot, relevantPolicies, previousResourceEvent.value)
292
293           // We do not have a previous event
294           case NoVal ⇒
295             throw new Exception(
296               "Unable to charge. No previous event given for %s".
297                 format(currentResourceEvent.toDebugString()))
298
299           // We could not obtain a previous event
300           case failed @ Failed(e, m) ⇒
301             throw new Exception(
302               "Unable to charge. Could not obtain previous event for %s".
303                 format(currentResourceEvent.toDebugString()), e)
304         }
305
306       // We do not need a previous event
307       case false ⇒
308         // ... so we cannot compute timedelta from a previous event, there is just one chargeslot
309         // referring to (almost) an instant in time
310 //        clog.debug("DO NOT have previous event")
311         val previousValue = costPolicy.getResourceInstanceUndefinedAmount
312 //        clog.debug("previousValue = costPolicy.getResourceInstanceUndefinedAmount = %s", previousValue)
313
314         val referenceTimeslot = Timeslot(new MutableDateCalc(occurredDate).goPreviousMilli.toDate, occurredDate)
315 //        clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
316
317 //        clog.debug("Calling policyStore.loadValidPolicyEntryAt(%s)", new MutableDateCalc(occurredMillis))
318         val relevantPolicyM = policyStore.loadValidPolicyAt(occurredMillis, dsl)
319 //        clog.debug("  ==> relevantPolicyM = %s", relevantPolicyM)
320
321         val relevantPolicies = relevantPolicyM match {
322           case Just(relevantPolicy) ⇒
323             Map(referenceTimeslot -> relevantPolicy)
324           case NoVal ⇒
325             throw new Exception("No relevant policy found for %s".format(referenceTimeslot))
326           case failed @ Failed(e, _) ⇒
327             throw new Exception("No relevant policy found for %s".format(referenceTimeslot), e)
328
329         }
330
331         (referenceTimeslot, relevantPolicies, previousValue)
332     }
333
334     val initialChargeslotsM = computeInitialChargeslots(
335       referenceTimeslot,
336       dslResource,
337       relevantPolicies,
338       agreementNamesByTimeslot,
339       Just(clog)
340     )
341
342     val fullChargeslotsM = initialChargeslotsM.map { chargeslots ⇒
343       chargeslots.map {
344         case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒
345           val execAlgorithmM = algorithmCompiler.compile(algorithmDefinition)
346           execAlgorithmM match {
347             case NoVal ⇒
348               throw new Exception("Could not compile algorithm %s".format(algorithmDefinition))
349
350             case failed @ Failed(e, m) ⇒
351               throw new Exception(m, e)
352
353             case Just(execAlgorithm) ⇒
354               val valueMap = costPolicy.makeValueMap(
355                 oldCredits,
356                 oldTotalAmount,
357                 newTotalAmount,
358                 stopMillis - startMillis,
359                 previousValue,
360                 currentResourceEvent.value,
361                 unitPrice
362               )
363
364 //              clog.debug("execAlgorithm = %s", execAlgorithm)
365               clog.debugMap("valueMap", valueMap, 1)
366
367               // This is it
368               val creditsM = execAlgorithm.apply(valueMap)
369
370               creditsM match {
371                 case NoVal ⇒
372                   throw new Exception(
373                     "Could not compute credits for resource %s during %s".
374                       format(dslResource.name, Timeslot(new Date(startMillis), new Date(stopMillis))))
375
376                 case failed @ Failed(e, m) ⇒
377                   throw new Exception(m, e)
378
379                 case Just(credits) ⇒
380                   chargeslot.copy(computedCredits = Some(credits))
381               }
382           }
383       }
384     }
385
386     val result = fullChargeslotsM match {
387       case Just(fullChargeslots) ⇒
388         referenceTimeslot -> fullChargeslots
389       case NoVal ⇒
390         null
391       case failed @ Failed(e, m) ⇒
392         throw new Exception(m, e)
393     }
394
395 //    clog.end()
396
397     result
398   }
399
400   /**
401    * Create a list of wallet entries by charging for a resource event.
402    *
403    * @param currentResourceEvent The resource event to create charges for
404    * @param agreements The user's agreement names, indexed by their
405    *                   applicability timeslot
406    * @param previousAmount The current state of the resource
407    * @param previousOccurred The last time the resource state was updated
408    */
409   def chargeEvent(currentResourceEvent: ResourceEvent,
410                   agreements: SortedMap[Timeslot, String],
411                   previousAmount: Double,
412                   previousOccurred: Date,
413                   related: List[WalletEntry]): Maybe[List[WalletEntry]] = {
414
415     assert(previousOccurred.getTime <= currentResourceEvent.occurredMillis)
416     val occuredDate = new Date(currentResourceEvent.occurredMillis)
417
418     /* The following makes sure that agreements exist between the start
419      * and end days of the processed event. As agreement updates are
420      * guaranteed not to leave gaps, this means that the event can be
421      * processed correctly, as at least one agreement will be valid
422      * throughout the event's life.
423      */
424     assert(
425       agreements.keysIterator.exists {
426         p => p.includes(occuredDate)
427       } && agreements.keysIterator.exists {
428         p => p.includes(previousOccurred)
429       }
430     )
431
432     val t = Timeslot(previousOccurred, occuredDate)
433
434     // Align policy and agreement validity timeslots to the event's boundaries
435     val policyTimeslots = t.align(
436       Policy.policies(previousOccurred, occuredDate).keysIterator.toList)
437     val agreementTimeslots = t.align(agreements.keysIterator.toList)
438
439     /*
440      * Get a set of timeslot slices covering the different durations of
441      * agreements and policies.
442      */
443     val aligned = alignTimeslots(policyTimeslots, agreementTimeslots)
444
445     val walletEntries = aligned.map {
446       x =>
447         // Retrieve agreement from the policy valid at time of event
448         val agreementName = agreements.find(y => y._1.contains(x)) match {
449           case Some(x) => x
450           case None => return Failed(new AccountingException(("Cannot find" +
451             " user agreement for period %s").format(x)))
452         }
453
454         // Do the wallet entry calculation
455         val entries = chargeEvent(
456           currentResourceEvent,
457           Policy.policy(x.from).findAgreement(agreementName._2).getOrElse(
458             return Failed(new AccountingException("Cannot get agreement for %s".format()))
459           ),
460           previousAmount,
461           previousOccurred,
462           related,
463           Some(x)
464         ) match {
465           case Just(x) => x
466           case Failed(f, e) => return Failed(f,e)
467           case NoVal => List()
468         }
469         entries
470     }.flatten
471
472     Just(walletEntries)
473   }
474
475   /**
476    * Creates a list of wallet entries by applying the agreement provisions on
477    * the resource state.
478    *
479    * @param event The resource event to create charges for
480    * @param agr The agreement implementation to use
481    * @param previousAmount The current state of the resource
482    * @param previousOccurred The timestamp of the previous event
483    * @param related Related wallet entries (TODO: should remove)
484    * @param chargeFor The duration for which the charge should be done.
485    *                  Should fall between the previous and current
486    *                  resource event boundaries
487    * @return A list of wallet entries, one for each
488    */
489   def chargeEvent(event: ResourceEvent,
490                   agr: DSLAgreement,
491                   previousAmount: Double,
492                   previousOccurred: Date,
493                   related: List[WalletEntry],
494                   chargeFor: Option[Timeslot]): Maybe[List[WalletEntry]] = {
495
496     // If chargeFor is not null, make sure it falls within
497     // event time boundaries
498     chargeFor.map{x => assert(true,
499       Timeslot(previousOccurred, new Date(event.occurredMillis)))}
500
501     if (!event.validate())
502       return Failed(new AccountingException("Event not valid"))
503
504     val policy = Policy.policy
505     val dslResource = policy.findResource(event.resource) match {
506       case Some(x) => x
507       case None => return Failed(
508         new AccountingException("No resource [%s]".format(event.resource)))
509     }
510
511     /* This is a safeguard against the special case where the last
512      * resource state update, as marked by the lastUpdate parameter
513      * is equal to the time of the event occurrence. This means that
514      * this is the first time the resource state has been recorded.
515      * Charging in this case only makes sense for discrete resources.
516      */
517     if (previousOccurred.getTime == event.occurredMillis) {
518       dslResource.costPolicy match {
519         case DiscreteCostPolicy => //Ok
520         case _ => return Some(List())
521       }
522     }
523
524     val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(Just(previousAmount), event.value)
525     val amount = creditCalculationValueM match {
526       case failed @ Failed(_, _) ⇒
527         return failed
528       case Just(amount) ⇒
529         amount
530       case NoVal ⇒
531         0.0
532     }
533
534     // We don't do strict checking for all cases for OnOffPolicies as
535     // above, since this point won't be reached in case of error.
536     val isFinal = dslResource.costPolicy match {
537       case OnOffCostPolicy =>
538         OnOffPolicyResourceState(previousAmount) match {
539           case OnResourceState => false
540           case OffResourceState => true
541         }
542       case _ => true
543     }
544
545     /*
546      * Get the timeslot for which this event will be charged. In case we
547      * have a discrete resource, we do not really care for the time duration
548      * of an event. To process all events in a uniform way, we create an
549      * artificial timeslot lasting the minimum amount of time. In all other
550      * cases, we first check whether a desired charge period passed as
551      * an argument.
552      */
553     val timeslot = dslResource.costPolicy match {
554       case DiscreteCostPolicy => Timeslot(new Date(event.occurredMillis - 1),
555         new Date(event.occurredMillis))
556       case _ => chargeFor match {
557         case Some(x) => x
558         case None => Timeslot(previousOccurred, new Date(event.occurredMillis))
559       }
560     }
561
562     /*
563      * The following splits the chargable timeslot into smaller timeslots to
564      * comply with different applicability periods for algorithms and
565      * pricelists defined by the provided agreement.
566      */
567     val chargeChunks = calcChangeChunks(agr, amount, dslResource, timeslot)
568
569     val timeReceived = System.currentTimeMillis
570
571     val rel = event.id :: related.map{x => x.sourceEventIDs}.flatten
572
573     val entries = chargeChunks.map { c=>
574         WalletEntry(
575           id = CryptoUtils.sha1(c.id),
576           occurredMillis = event.occurredMillis,
577           receivedMillis = timeReceived,
578           sourceEventIDs = rel,
579           value = c.cost,
580           reason = c.reason,
581           userId = event.userId,
582           resource = event.resource,
583           instanceId = event.instanceId,
584           finalized = isFinal
585         )
586     }
587     Just(entries)
588   }
589
590   /**
591    * Create a
592    */
593   def calcChangeChunks(agr: DSLAgreement, volume: Double,
594                        res: DSLResource, t: Timeslot): List[ChargeChunk] = {
595
596     val alg = resolveEffectiveAlgorithmsForTimeslot(t, agr)
597     val pri = resolveEffectivePricelistsForTimeslot(t, agr)
598     val chunks = splitChargeChunks(alg, pri)
599     val algChunked = chunks._1
600     val priChunked = chunks._2
601
602     assert(algChunked.size == priChunked.size)
603
604     res.costPolicy match {
605       case DiscreteCostPolicy => calcChargeChunksDiscrete(algChunked, priChunked, volume, res)
606       case _ => calcChargeChunksContinuous(algChunked, priChunked, volume, res)
607     }
608   }
609
610   /**
611    * Get a list of charge chunks for discrete resources.
612    */
613   private[logic]
614   def calcChargeChunksDiscrete(algChunked: Map[Timeslot, DSLAlgorithm],
615                                priChunked: Map[Timeslot, DSLPriceList],
616                                volume: Double, res: DSLResource): List[ChargeChunk] = {
617     // In case of descrete resources, we only a expect a
618     assert(algChunked.size == 1)
619     assert(priChunked.size == 1)
620     assert(algChunked.keySet.head.compare(priChunked.keySet.head) == 0)
621
622     List(ChargeChunk(volume,
623       algChunked.valuesIterator.next.algorithms.getOrElse(res, ""),
624       priChunked.valuesIterator.next.prices.getOrElse(res, 0),
625       algChunked.keySet.head, res))
626   }
627
628   /**
629    * Get a list of charge chunks for continuous resources.
630    */
631   private[logic]
632   def calcChargeChunksContinuous(algChunked: Map[Timeslot, DSLAlgorithm],
633                                  priChunked: Map[Timeslot, DSLPriceList],
634                                  volume: Double, res: DSLResource): List[ChargeChunk] = {
635     algChunked.keysIterator.map {
636       x =>
637         ChargeChunk(volume,
638           algChunked.get(x).get.algorithms.getOrElse(res, ""),
639           priChunked.get(x).get.prices.getOrElse(res, 0), x, res)
640     }.toList
641   }
642
643   /**
644    * Align charge timeslots between algorithms and pricelists. As algorithm
645    * and pricelists can have different effectivity periods, this method
646    * examines them and splits them as necessary.
647    */
648   private[logic] def splitChargeChunks(alg: SortedMap[Timeslot, DSLAlgorithm],
649                                        price: SortedMap[Timeslot, DSLPriceList]) :
650     (Map[Timeslot, DSLAlgorithm], Map[Timeslot, DSLPriceList]) = {
651
652     val zipped = alg.keySet.zip(price.keySet)
653
654     zipped.find(p => !p._1.equals(p._2)) match {
655       case None => (alg, price)
656       case Some(x) =>
657         val algTimeslot = x._1
658         val priTimeslot = x._2
659
660         assert(algTimeslot.from == priTimeslot.from)
661
662         if (algTimeslot.endsAfter(priTimeslot)) {
663           val slices = algTimeslot.slice(priTimeslot.to)
664           val algo = alg.get(algTimeslot).get
665           val newalg = alg - algTimeslot ++ Map(slices.apply(0) -> algo) ++ Map(slices.apply(1) -> algo)
666           splitChargeChunks(newalg, price)
667         }
668         else {
669           val slices = priTimeslot.slice(priTimeslot.to)
670           val pl = price.get(priTimeslot).get
671           val newPrice = price - priTimeslot ++ Map(slices.apply(0) -> pl) ++ Map(slices.apply(1) -> pl)
672           splitChargeChunks(alg, newPrice)
673         }
674     }
675   }
676
677   /**
678    * Given two lists of timeslots, produce a list which contains the
679    * set of timeslot slices, as those are defined by
680    * timeslot overlaps.
681    *
682    * For example, given the timeslots a and b below, split them as shown.
683    *
684    * a = |****************|
685    *     ^                ^
686    *   a.from            a.to
687    * b = |*********|
688    *     ^         ^
689    *   b.from     b.to
690    *
691    * result: List(Timeslot(a.from, b.to), Timeslot(b.to, a.to))
692    */
693   private[logic] def alignTimeslots(a: List[Timeslot],
694                                     b: List[Timeslot]): List[Timeslot] = {
695
696     def safeTail(foo: List[Timeslot]) = foo match {
697       case Nil       => List()
698       case x :: Nil  => List()
699       case x :: rest => rest
700     }
701
702     if (a.isEmpty) return b
703     if (b.isEmpty) return a
704
705     assert (a.head.from == b.head.from)
706
707     if (a.head.endsAfter(b.head)) {
708       val slice = a.head.slice(b.head.to)
709       slice.head :: alignTimeslots(slice.last :: a.tail, safeTail(b))
710     } else if (b.head.endsAfter(a.head)) {
711       val slice = b.head.slice(a.head.to)
712       slice.head :: alignTimeslots(safeTail(a), slice.last :: b.tail)
713     } else {
714       a.head :: alignTimeslots(safeTail(a), safeTail(b))
715     }
716   }
717 }
718
719 /**
720  * Encapsulates a computation for a specific timeslot of
721  * resource usage.
722  */
723 case class ChargeChunk(value: Double, algorithm: String,
724                        price: Double, when: Timeslot,
725                        resource: DSLResource) {
726   assert(value > 0)
727   assert(!algorithm.isEmpty)
728   assert(resource != null)
729
730   def cost(): Double =
731     //TODO: Apply the algorithm, when we start parsing it
732     resource.costPolicy match {
733       case DiscreteCostPolicy =>
734         value * price
735       case _ =>
736         value * price * when.hours
737     }
738
739   def reason(): String =
740     resource.costPolicy match {
741       case DiscreteCostPolicy =>
742         "%f %s at %s @ %f/%s".format(value, resource.unit, when.from, price,
743           resource.unit)
744       case ContinuousCostPolicy =>
745         "%f %s of %s from %s to %s @ %f/%s".format(value, resource.unit,
746           resource.name, when.from, when.to, price, resource.unit)
747       case OnOffCostPolicy =>
748         "%f %s of %s from %s to %s @ %f/%s".format(when.hours, resource.unit,
749           resource.name, when.from, when.to, price, resource.unit)
750     }
751
752   def id(): String =
753     CryptoUtils.sha1("%f%s%f%s%s%d".format(value, algorithm, price, when.toString,
754       resource.name, System.currentTimeMillis()))
755 }
756
757 /** An exception raised when something goes wrong with accounting */
758 class AccountingException(msg: String) extends Exception(msg)