Code reorganization
[aquarium] / src / main / scala / gr / grnet / aquarium / logic / accounting / Accounting.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.logic.accounting
37
38 import gr.grnet.aquarium.util.shortClassNameOf
39 import algorithm.CostPolicyAlgorithmCompiler
40 import dsl._
41 import collection.immutable.SortedMap
42 import java.util.Date
43 import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just}
44 import gr.grnet.aquarium.util.date.MutableDateCalc
45 import gr.grnet.aquarium.util.{ContextualLogger, CryptoUtils, Loggable}
46 import gr.grnet.aquarium.store.PolicyStore
47 import gr.grnet.aquarium.AquariumException
48 import gr.grnet.aquarium.events.{WalletEntry, ResourceEvent}
49
50 /**
51  * A timeslot together with the algorithm and unit price that apply for this particular timeslot.
52  *
53  * @param startMillis
54  * @param stopMillis
55  * @param algorithmDefinition
56  * @param unitPrice
57  * @param computedCredits The computed credits
58  */
59 case class Chargeslot(startMillis: Long,
60                       stopMillis: Long,
61                       algorithmDefinition: String,
62                       unitPrice: Double,
63                       computedCredits: Option[Double] = None) {
64
65   override def toString = "%s(%s, %s, %s, %s, %s)".format(
66     shortClassNameOf(this),
67     new MutableDateCalc(startMillis).toYYYYMMDDHHMMSSSSS,
68     new MutableDateCalc(stopMillis).toYYYYMMDDHHMMSSSSS,
69     unitPrice,
70     computedCredits,
71     algorithmDefinition
72   )
73 }
74
75 /**
76  * Methods for converting accounting events to wallet entries.
77  *
78  * @author Georgios Gousios <gousiosg@gmail.com>
79  * @author Christos KK Loverdos <loverdos@gmail.com>
80  */
81 trait Accounting extends DSLUtils with Loggable {
82   /**
83    * Breaks a reference timeslot (e.g. billing period) according to policies and agreements.
84    *
85    * @param referenceTimeslot
86    * @param policyTimeslots
87    * @param agreementTimeslots
88    * @return
89    */
90   protected
91   def splitTimeslotByPoliciesAndAgreements(referenceTimeslot: Timeslot,
92                                            policyTimeslots: List[Timeslot],
93                                            agreementTimeslots: List[Timeslot],
94                                            clogM: Maybe[ContextualLogger] = NoVal): List[Timeslot] = {
95
96 //    val clog = ContextualLogger.fromOther(clogM, logger, "splitTimeslotByPoliciesAndAgreements()")
97 //    clog.begin()
98
99     // Align policy and agreement validity timeslots to the referenceTimeslot
100     val alignedPolicyTimeslots    = referenceTimeslot.align(policyTimeslots)
101     val alignedAgreementTimeslots = referenceTimeslot.align(agreementTimeslots)
102
103 //    clog.debug("referenceTimeslot = %s", referenceTimeslot)
104 //    clog.debugSeq("alignedPolicyTimeslots", alignedPolicyTimeslots, 0)
105 //    clog.debugSeq("alignedAgreementTimeslots", alignedAgreementTimeslots, 0)
106
107     val result = alignTimeslots(alignedPolicyTimeslots, alignedAgreementTimeslots)
108 //    clog.debugSeq("result", result, 1)
109 //    clog.end()
110     result
111   }
112
113   /**
114    * Given a reference timeslot, we have to break it up to a series of timeslots where a particular
115    * algorithm and price unit is in effect.
116    *
117    */
118   protected
119   def resolveEffectiveAlgorithmsAndPriceLists(alignedTimeslot: Timeslot,
120                                               agreement: DSLAgreement,
121                                               clogM: Maybe[ContextualLogger] = NoVal): (Map[Timeslot, DSLAlgorithm], Map[Timeslot, DSLPriceList]) = {
122
123     val clog = ContextualLogger.fromOther(clogM, logger, "resolveEffectiveAlgorithmsAndPriceLists()")
124
125     // Note that most of the code is taken from calcChangeChunks()
126     val alg = resolveEffectiveAlgorithmsForTimeslot(alignedTimeslot, agreement)
127     val pri = resolveEffectivePricelistsForTimeslot(alignedTimeslot, agreement)
128     val chargeChunks = splitChargeChunks(alg, pri)
129     val algorithmByTimeslot = chargeChunks._1
130     val pricelistByTimeslot = chargeChunks._2
131
132     assert(algorithmByTimeslot.size == pricelistByTimeslot.size)
133
134     (algorithmByTimeslot, pricelistByTimeslot)
135   }
136
137   protected
138   def computeInitialChargeslots(referenceTimeslot: Timeslot,
139                                 dslResource: DSLResource,
140                                 policiesByTimeslot: Map[Timeslot, DSLPolicy],
141                                 agreementNamesByTimeslot: Map[Timeslot, String],
142                                 contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[List[Chargeslot]] = Maybe {
143
144     val clog = ContextualLogger.fromOther(contextualLogger, logger, "computeInitialChargeslots()")
145 //    clog.begin()
146
147     val policyTimeslots = policiesByTimeslot.keySet
148     val agreementTimeslots = agreementNamesByTimeslot.keySet
149
150 //    clog.debugMap("policiesByTimeslot", policiesByTimeslot, 1)
151 //    clog.debugMap("agreementNamesByTimeslot", agreementNamesByTimeslot, 1)
152
153     def getPolicy(ts: Timeslot): DSLPolicy = {
154       policiesByTimeslot.find(_._1.contains(ts)).get._2
155     }
156     def getAgreementName(ts: Timeslot): String = {
157       agreementNamesByTimeslot.find(_._1.contains(ts)).get._2
158     }
159
160     // 1. Round ONE: split time according to overlapping policies and agreements.
161 //    clog.begin("ROUND 1")
162     val alignedTimeslots = splitTimeslotByPoliciesAndAgreements(referenceTimeslot, policyTimeslots.toList, agreementTimeslots.toList, Just(clog))
163 //    clog.debugSeq("alignedTimeslots", alignedTimeslots, 1)
164 //    clog.end("ROUND 1")
165
166     // 2. Round TWO: Use the aligned timeslots of Round ONE to produce even more
167     //    fine-grained timeslots according to applicable algorithms.
168     //    Then pack the info into charge slots.
169 //    clog.begin("ROUND 2")
170     val allChargeslots = for {
171       alignedTimeslot <- alignedTimeslots
172     } yield {
173 //      val alignedTimeslotMsg = "alignedTimeslot = %s".format(alignedTimeslot)
174 //      clog.begin(alignedTimeslotMsg)
175
176       val dslPolicy = getPolicy(alignedTimeslot)
177 //      clog.debug("dslPolicy = %s", dslPolicy)
178       val agreementName = getAgreementName(alignedTimeslot)
179 //      clog.debug("agreementName = %s", agreementName)
180       val agreementOpt = dslPolicy.findAgreement(agreementName)
181 //      clog.debug("agreementOpt = %s", agreementOpt)
182
183       agreementOpt match {
184         case None ⇒
185           val errMsg = "Unknown agreement %s during %s".format(agreementName, alignedTimeslot)
186           clog.error("%s", errMsg)
187           throw new Exception(errMsg)
188
189         case Some(agreement) ⇒
190           // TODO: Factor this out, just like we did with:
191           // TODO:  val alignedTimeslots = splitTimeslotByPoliciesAndAgreements
192           // Note that most of the code is already taken from calcChangeChunks()
193           val r = resolveEffectiveAlgorithmsAndPriceLists(alignedTimeslot, agreement, Just(clog))
194           val algorithmByTimeslot: Map[Timeslot, DSLAlgorithm] = r._1
195           val pricelistByTimeslot: Map[Timeslot, DSLPriceList] = r._2
196
197           // Now, the timeslots must be the same
198           val finegrainedTimeslots = algorithmByTimeslot.keySet
199
200           val chargeslots = for {
201             finegrainedTimeslot <- finegrainedTimeslots
202           } yield {
203 //            val finegrainedTimeslotMsg = "finegrainedTimeslot = %s".format(finegrainedTimeslot)
204 //            clog.begin(finegrainedTimeslotMsg)
205
206             val dslAlgorithm = algorithmByTimeslot(finegrainedTimeslot) // TODO: is this correct?
207 //            clog.debug("dslAlgorithm = %s", dslAlgorithm)
208 //            clog.debugMap("dslAlgorithm.algorithms", dslAlgorithm.algorithms, 1)
209             val dslPricelist = pricelistByTimeslot(finegrainedTimeslot) // TODO: is this correct?
210 //            clog.debug("dslPricelist = %s", dslPricelist)
211 //            clog.debug("dslResource = %s", dslResource)
212             val algorithmDefOpt = dslAlgorithm.algorithms.get(dslResource)
213 //            clog.debug("algorithmDefOpt = %s", algorithmDefOpt)
214             val priceUnitOpt = dslPricelist.prices.get(dslResource)
215 //            clog.debug("priceUnitOpt = %s", priceUnitOpt)
216
217             val chargeslot = (algorithmDefOpt, priceUnitOpt) match {
218               case (None, None) ⇒
219                 throw new Exception(
220                   "Unknown algorithm and price unit for resource %s during %s".
221                     format(dslResource, finegrainedTimeslot))
222               case (None, _) ⇒
223                 throw new Exception(
224                   "Unknown algorithm for resource %s during %s".
225                     format(dslResource, finegrainedTimeslot))
226               case (_, None) ⇒
227                 throw new Exception(
228                   "Unknown price unit for resource %s during %s".
229                     format(dslResource, finegrainedTimeslot))
230               case (Some(algorithmDefinition), Some(priceUnit)) ⇒
231                 Chargeslot(finegrainedTimeslot.from.getTime, finegrainedTimeslot.to.getTime, algorithmDefinition, priceUnit)
232             }
233
234 //            clog.end(finegrainedTimeslotMsg)
235             chargeslot
236           }
237
238 //          clog.end(alignedTimeslotMsg)
239           chargeslots.toList
240       }
241     }
242 //    clog.end("ROUND 2")
243
244
245     val result = allChargeslots.flatten
246 //    clog.debugSeq("result", allChargeslots, 1)
247 //    clog.end()
248     result
249   }
250
251   /**
252    * Compute the charge slots generated by a particular resource event.
253    *
254    */
255   def computeFullChargeslots(previousResourceEventM: Maybe[ResourceEvent],
256                              currentResourceEvent: ResourceEvent,
257                              oldCredits: Double,
258                              oldTotalAmount: Double,
259                              newTotalAmount: Double,
260                              dslResource: DSLResource,
261                              defaultResourceMap: DSLResourcesMap,
262                              agreementNamesByTimeslot: Map[Timeslot, String],
263                              algorithmCompiler: CostPolicyAlgorithmCompiler,
264                              policyStore: PolicyStore,
265                              contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[(Timeslot, List[Chargeslot])] = Maybe {
266
267     val clog = ContextualLogger.fromOther(contextualLogger, logger, "computeFullChargeslots()")
268 //    clog.begin()
269
270     val occurredDate = currentResourceEvent.occurredDate
271     val occurredMillis = currentResourceEvent.occurredMillis
272     val costPolicy = dslResource.costPolicy
273
274     val dsl = new DSL{}
275     val (referenceTimeslot, relevantPolicies, previousValue) = costPolicy.needsPreviousEventForCreditAndAmountCalculation match {
276       // We need a previous event
277       case true ⇒
278         previousResourceEventM match {
279           // We have a previous event
280           case Just(previousResourceEvent) ⇒
281 //            clog.debug("Have previous event")
282 //            clog.debug("previousValue = %s", previousResourceEvent.value)
283
284             val referenceTimeslot = Timeslot(previousResourceEvent.occurredDate, occurredDate)
285 //            clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
286
287             // all policies within the interval from previous to current resource event
288 //            clog.debug("Calling policyStore.loadAndSortPoliciesWithin(%s)", referenceTimeslot)
289             val relevantPolicies = policyStore.loadAndSortPoliciesWithin(referenceTimeslot.from.getTime, referenceTimeslot.to.getTime, dsl)
290 //            clog.debugMap("==> relevantPolicies", relevantPolicies, 0)
291
292             (referenceTimeslot, relevantPolicies, previousResourceEvent.value)
293
294           // We do not have a previous event
295           case NoVal ⇒
296             throw new AquariumException(
297               "Unable to charge. No previous event given for %s".
298                 format(currentResourceEvent.toDebugString()))
299
300           // We could not obtain a previous event
301           case failed @ Failed(e) ⇒
302             throw new AquariumException(
303               "Unable to charge. Could not obtain previous event for %s".
304                 format(currentResourceEvent.toDebugString()), e)
305         }
306
307       // We do not need a previous event
308       case false ⇒
309         // ... so we cannot compute timedelta from a previous event, there is just one chargeslot
310         // referring to (almost) an instant in time
311 //        clog.debug("DO NOT have previous event")
312         val previousValue = costPolicy.getResourceInstanceUndefinedAmount
313 //        clog.debug("previousValue = costPolicy.getResourceInstanceUndefinedAmount = %s", previousValue)
314
315         val referenceTimeslot = Timeslot(new MutableDateCalc(occurredDate).goPreviousMilli.toDate, occurredDate)
316 //        clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
317
318 //        clog.debug("Calling policyStore.loadValidPolicyEntryAt(%s)", new MutableDateCalc(occurredMillis))
319         val relevantPolicyM = policyStore.loadValidPolicyAt(occurredMillis, dsl)
320 //        clog.debug("  ==> relevantPolicyM = %s", relevantPolicyM)
321
322         val relevantPolicies = relevantPolicyM match {
323           case Just(relevantPolicy) ⇒
324             Map(referenceTimeslot -> relevantPolicy)
325           case NoVal ⇒
326             throw new AquariumException("No relevant policy found for %s".format(referenceTimeslot))
327           case failed @ Failed(e) ⇒
328             throw new AquariumException("No relevant policy found for %s".format(referenceTimeslot), e)
329
330         }
331
332         (referenceTimeslot, relevantPolicies, previousValue)
333     }
334
335     val initialChargeslotsM = computeInitialChargeslots(
336       referenceTimeslot,
337       dslResource,
338       relevantPolicies,
339       agreementNamesByTimeslot,
340       Just(clog)
341     )
342
343     val fullChargeslotsM = initialChargeslotsM.map { chargeslots ⇒
344       chargeslots.map {
345         case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒
346           val execAlgorithmM = algorithmCompiler.compile(algorithmDefinition)
347           execAlgorithmM match {
348             case NoVal ⇒
349               throw new AquariumException("Could not compile algorithm %s".format(algorithmDefinition))
350
351             case failed @ Failed(e) ⇒
352               failed.throwMe
353
354             case Just(execAlgorithm) ⇒
355               val valueMap = costPolicy.makeValueMap(
356                 oldCredits,
357                 oldTotalAmount,
358                 newTotalAmount,
359                 stopMillis - startMillis,
360                 previousValue,
361                 currentResourceEvent.value,
362                 unitPrice
363               )
364
365 //              clog.debug("execAlgorithm = %s", execAlgorithm)
366               clog.debugMap("valueMap", valueMap, 1)
367
368               // This is it
369               val creditsM = execAlgorithm.apply(valueMap)
370
371               creditsM match {
372                 case NoVal ⇒
373                   throw new AquariumException(
374                     "Could not compute credits for resource %s during %s".
375                       format(dslResource.name, Timeslot(new Date(startMillis), new Date(stopMillis))))
376
377                 case failed @ Failed(e) ⇒
378                   failed.throwMe
379
380                 case Just(credits) ⇒
381                   chargeslot.copy(computedCredits = Some(credits))
382               }
383           }
384       }
385     }
386
387     val result = fullChargeslotsM match {
388       case Just(fullChargeslots) ⇒
389         referenceTimeslot -> fullChargeslots
390       case NoVal ⇒
391         null
392       case failed @ Failed(e) ⇒
393         failed.throwMe
394     }
395
396 //    clog.end()
397
398     result
399   }
400
401   /**
402    * Create a list of wallet entries by charging for a resource event.
403    *
404    * @param currentResourceEvent The resource event to create charges for
405    * @param agreements The user's agreement names, indexed by their
406    *                   applicability timeslot
407    * @param previousAmount The current state of the resource
408    * @param previousOccurred The last time the resource state was updated
409    */
410   def chargeEvent(currentResourceEvent: ResourceEvent,
411                   agreements: SortedMap[Timeslot, String],
412                   previousAmount: Double,
413                   previousOccurred: Date,
414                   related: List[WalletEntry]): Maybe[List[WalletEntry]] = {
415
416     assert(previousOccurred.getTime <= currentResourceEvent.occurredMillis)
417     val occuredDate = new Date(currentResourceEvent.occurredMillis)
418
419     /* The following makes sure that agreements exist between the start
420      * and end days of the processed event. As agreement updates are
421      * guaranteed not to leave gaps, this means that the event can be
422      * processed correctly, as at least one agreement will be valid
423      * throughout the event's life.
424      */
425     assert(
426       agreements.keysIterator.exists {
427         p => p.includes(occuredDate)
428       } && agreements.keysIterator.exists {
429         p => p.includes(previousOccurred)
430       }
431     )
432
433     val t = Timeslot(previousOccurred, occuredDate)
434
435     // Align policy and agreement validity timeslots to the event's boundaries
436     val policyTimeslots = t.align(
437       Policy.policies(previousOccurred, occuredDate).keysIterator.toList)
438     val agreementTimeslots = t.align(agreements.keysIterator.toList)
439
440     /*
441      * Get a set of timeslot slices covering the different durations of
442      * agreements and policies.
443      */
444     val aligned = alignTimeslots(policyTimeslots, agreementTimeslots)
445
446     val walletEntries = aligned.map {
447       x =>
448         // Retrieve agreement from the policy valid at time of event
449         val agreementName = agreements.find(y => y._1.contains(x)) match {
450           case Some(x) => x
451           case None => return Failed(new AccountingException(("Cannot find" +
452             " user agreement for period %s").format(x)))
453         }
454
455         // Do the wallet entry calculation
456         val entries = chargeEvent(
457           currentResourceEvent,
458           Policy.policy(x.from).findAgreement(agreementName._2).getOrElse(
459             return Failed(new AccountingException("Cannot get agreement for %s".format()))
460           ),
461           previousAmount,
462           previousOccurred,
463           related,
464           Some(x)
465         ) match {
466           case Just(x) => x
467           case Failed(f) => return Failed(f)
468           case NoVal => List()
469         }
470         entries
471     }.flatten
472
473     Just(walletEntries)
474   }
475
476   /**
477    * Creates a list of wallet entries by applying the agreement provisions on
478    * the resource state.
479    *
480    * @param event The resource event to create charges for
481    * @param agr The agreement implementation to use
482    * @param previousAmount The current state of the resource
483    * @param previousOccurred The timestamp of the previous event
484    * @param related Related wallet entries (TODO: should remove)
485    * @param chargeFor The duration for which the charge should be done.
486    *                  Should fall between the previous and current
487    *                  resource event boundaries
488    * @return A list of wallet entries, one for each
489    */
490   def chargeEvent(event: ResourceEvent,
491                   agr: DSLAgreement,
492                   previousAmount: Double,
493                   previousOccurred: Date,
494                   related: List[WalletEntry],
495                   chargeFor: Option[Timeslot]): Maybe[List[WalletEntry]] = {
496
497     // If chargeFor is not null, make sure it falls within
498     // event time boundaries
499     chargeFor.map{x => assert(true,
500       Timeslot(previousOccurred, new Date(event.occurredMillis)))}
501
502     if (!event.validate())
503       return Failed(new AccountingException("Event not valid"))
504
505     val policy = Policy.policy
506     val dslResource = policy.findResource(event.resource) match {
507       case Some(x) => x
508       case None => return Failed(
509         new AccountingException("No resource [%s]".format(event.resource)))
510     }
511
512     /* This is a safeguard against the special case where the last
513      * resource state update, as marked by the lastUpdate parameter
514      * is equal to the time of the event occurrence. This means that
515      * this is the first time the resource state has been recorded.
516      * Charging in this case only makes sense for discrete resources.
517      */
518     if (previousOccurred.getTime == event.occurredMillis) {
519       dslResource.costPolicy match {
520         case DiscreteCostPolicy => //Ok
521         case _ => return Some(List())
522       }
523     }
524
525     val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(Just(previousAmount), event.value)
526     val amount = creditCalculationValueM match {
527       case failed @ Failed(_) ⇒
528         return failed
529       case Just(amount) ⇒
530         amount
531       case NoVal ⇒
532         0.0
533     }
534
535     // We don't do strict checking for all cases for OnOffPolicies as
536     // above, since this point won't be reached in case of error.
537     val isFinal = dslResource.costPolicy match {
538       case OnOffCostPolicy =>
539         OnOffPolicyResourceState(previousAmount) match {
540           case OnResourceState => false
541           case OffResourceState => true
542         }
543       case _ => true
544     }
545
546     /*
547      * Get the timeslot for which this event will be charged. In case we
548      * have a discrete resource, we do not really care for the time duration
549      * of an event. To process all events in a uniform way, we create an
550      * artificial timeslot lasting the minimum amount of time. In all other
551      * cases, we first check whether a desired charge period passed as
552      * an argument.
553      */
554     val timeslot = dslResource.costPolicy match {
555       case DiscreteCostPolicy => Timeslot(new Date(event.occurredMillis - 1),
556         new Date(event.occurredMillis))
557       case _ => chargeFor match {
558         case Some(x) => x
559         case None => Timeslot(previousOccurred, new Date(event.occurredMillis))
560       }
561     }
562
563     /*
564      * The following splits the chargable timeslot into smaller timeslots to
565      * comply with different applicability periods for algorithms and
566      * pricelists defined by the provided agreement.
567      */
568     val chargeChunks = calcChangeChunks(agr, amount, dslResource, timeslot)
569
570     val timeReceived = System.currentTimeMillis
571
572     val rel = event.id :: related.map{x => x.sourceEventIDs}.flatten
573
574     val entries = chargeChunks.map { c=>
575         WalletEntry(
576           id = CryptoUtils.sha1(c.id),
577           occurredMillis = event.occurredMillis,
578           receivedMillis = timeReceived,
579           sourceEventIDs = rel,
580           value = c.cost,
581           reason = c.reason,
582           userId = event.userID,
583           resource = event.resource,
584           instanceId = event.instanceID,
585           finalized = isFinal
586         )
587     }
588     Just(entries)
589   }
590
591   /**
592    * Create a
593    */
594   def calcChangeChunks(agr: DSLAgreement, volume: Double,
595                        res: DSLResource, t: Timeslot): List[ChargeChunk] = {
596
597     val alg = resolveEffectiveAlgorithmsForTimeslot(t, agr)
598     val pri = resolveEffectivePricelistsForTimeslot(t, agr)
599     val chunks = splitChargeChunks(alg, pri)
600     val algChunked = chunks._1
601     val priChunked = chunks._2
602
603     assert(algChunked.size == priChunked.size)
604
605     res.costPolicy match {
606       case DiscreteCostPolicy => calcChargeChunksDiscrete(algChunked, priChunked, volume, res)
607       case _ => calcChargeChunksContinuous(algChunked, priChunked, volume, res)
608     }
609   }
610
611   /**
612    * Get a list of charge chunks for discrete resources.
613    */
614   private[logic]
615   def calcChargeChunksDiscrete(algChunked: Map[Timeslot, DSLAlgorithm],
616                                priChunked: Map[Timeslot, DSLPriceList],
617                                volume: Double, res: DSLResource): List[ChargeChunk] = {
618     // In case of descrete resources, we only a expect a
619     assert(algChunked.size == 1)
620     assert(priChunked.size == 1)
621     assert(algChunked.keySet.head.compare(priChunked.keySet.head) == 0)
622
623     List(ChargeChunk(volume,
624       algChunked.valuesIterator.next.algorithms.getOrElse(res, ""),
625       priChunked.valuesIterator.next.prices.getOrElse(res, 0),
626       algChunked.keySet.head, res))
627   }
628
629   /**
630    * Get a list of charge chunks for continuous resources.
631    */
632   private[logic]
633   def calcChargeChunksContinuous(algChunked: Map[Timeslot, DSLAlgorithm],
634                                  priChunked: Map[Timeslot, DSLPriceList],
635                                  volume: Double, res: DSLResource): List[ChargeChunk] = {
636     algChunked.keysIterator.map {
637       x =>
638         ChargeChunk(volume,
639           algChunked.get(x).get.algorithms.getOrElse(res, ""),
640           priChunked.get(x).get.prices.getOrElse(res, 0), x, res)
641     }.toList
642   }
643
644   /**
645    * Align charge timeslots between algorithms and pricelists. As algorithm
646    * and pricelists can have different effectivity periods, this method
647    * examines them and splits them as necessary.
648    */
649   private[logic] def splitChargeChunks(alg: SortedMap[Timeslot, DSLAlgorithm],
650                                        price: SortedMap[Timeslot, DSLPriceList]) :
651     (Map[Timeslot, DSLAlgorithm], Map[Timeslot, DSLPriceList]) = {
652
653     val zipped = alg.keySet.zip(price.keySet)
654
655     zipped.find(p => !p._1.equals(p._2)) match {
656       case None => (alg, price)
657       case Some(x) =>
658         val algTimeslot = x._1
659         val priTimeslot = x._2
660
661         assert(algTimeslot.from == priTimeslot.from)
662
663         if (algTimeslot.endsAfter(priTimeslot)) {
664           val slices = algTimeslot.slice(priTimeslot.to)
665           val algo = alg.get(algTimeslot).get
666           val newalg = alg - algTimeslot ++ Map(slices.apply(0) -> algo) ++ Map(slices.apply(1) -> algo)
667           splitChargeChunks(newalg, price)
668         }
669         else {
670           val slices = priTimeslot.slice(priTimeslot.to)
671           val pl = price.get(priTimeslot).get
672           val newPrice = price - priTimeslot ++ Map(slices.apply(0) -> pl) ++ Map(slices.apply(1) -> pl)
673           splitChargeChunks(alg, newPrice)
674         }
675     }
676   }
677
678   /**
679    * Given two lists of timeslots, produce a list which contains the
680    * set of timeslot slices, as those are defined by
681    * timeslot overlaps.
682    *
683    * For example, given the timeslots a and b below, split them as shown.
684    *
685    * a = |****************|
686    *     ^                ^
687    *   a.from            a.to
688    * b = |*********|
689    *     ^         ^
690    *   b.from     b.to
691    *
692    * result: List(Timeslot(a.from, b.to), Timeslot(b.to, a.to))
693    */
694   private[logic] def alignTimeslots(a: List[Timeslot],
695                                     b: List[Timeslot]): List[Timeslot] = {
696
697     def safeTail(foo: List[Timeslot]) = foo match {
698       case Nil       => List()
699       case x :: Nil  => List()
700       case x :: rest => rest
701     }
702
703     if (a.isEmpty) return b
704     if (b.isEmpty) return a
705
706     assert (a.head.from == b.head.from)
707
708     if (a.head.endsAfter(b.head)) {
709       val slice = a.head.slice(b.head.to)
710       slice.head :: alignTimeslots(slice.last :: a.tail, safeTail(b))
711     } else if (b.head.endsAfter(a.head)) {
712       val slice = b.head.slice(a.head.to)
713       slice.head :: alignTimeslots(safeTail(a), slice.last :: b.tail)
714     } else {
715       a.head :: alignTimeslots(safeTail(a), safeTail(b))
716     }
717   }
718 }
719
720 /**
721  * Encapsulates a computation for a specific timeslot of
722  * resource usage.
723  */
724 case class ChargeChunk(value: Double, algorithm: String,
725                        price: Double, when: Timeslot,
726                        resource: DSLResource) {
727   assert(value > 0)
728   assert(!algorithm.isEmpty)
729   assert(resource != null)
730
731   def cost(): Double =
732     //TODO: Apply the algorithm, when we start parsing it
733     resource.costPolicy match {
734       case DiscreteCostPolicy =>
735         value * price
736       case _ =>
737         value * price * when.hours
738     }
739
740   def reason(): String =
741     resource.costPolicy match {
742       case DiscreteCostPolicy =>
743         "%f %s at %s @ %f/%s".format(value, resource.unit, when.from, price,
744           resource.unit)
745       case ContinuousCostPolicy =>
746         "%f %s of %s from %s to %s @ %f/%s".format(value, resource.unit,
747           resource.name, when.from, when.to, price, resource.unit)
748       case OnOffCostPolicy =>
749         "%f %s of %s from %s to %s @ %f/%s".format(when.hours, resource.unit,
750           resource.name, when.from, when.to, price, resource.unit)
751     }
752
753   def id(): String =
754     CryptoUtils.sha1("%f%s%f%s%s%d".format(value, algorithm, price, when.toString,
755       resource.name, System.currentTimeMillis()))
756 }
757
758 /** An exception raised when something goes wrong with accounting */
759 class AccountingException(msg: String) extends AquariumException(msg)