Simple test case with one ON-OFF sequence.
[aquarium] / src / main / scala / gr / grnet / aquarium / user / UserStateComputations.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.user
37
38
39 import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
40 import gr.grnet.aquarium.util.{ContextualLogger, Loggable, justForSure, failedForSure}
41 import gr.grnet.aquarium.logic.events.{NewWalletEntry, ResourceEvent}
42 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
43 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLCostPolicy, DSLResourcesMap, DSLPolicy}
44 import gr.grnet.aquarium.store.{RecordID, StoreProvider, PolicyStore, UserStateStore, ResourceEventStore}
45 import gr.grnet.aquarium.logic.accounting.{Chargeslot, Accounting}
46 import scala.collection.mutable
47 import gr.grnet.aquarium.logic.events.ResourceEvent._
48 import gr.grnet.aquarium.logic.accounting.algorithm.{CostPolicyAlgorithmCompiler, SimpleCostPolicyAlgorithmCompiler}
49
50 /**
51  *
52  * @author Christos KK Loverdos <loverdos@gmail.com>
53  */
54 class UserStateComputations extends Loggable {
55   def createInitialUserState(userId: String,
56                              userCreationMillis: Long,
57                              isActive: Boolean,
58                              credits: Double,
59                              roleNames: List[String] = List(),
60                              agreementName: String = DSLAgreement.DefaultAgreementName) = {
61     val now = userCreationMillis
62
63     UserState(
64       userId,
65       userCreationMillis,
66       0L,
67       false,
68       null,
69       ImplicitlyIssuedResourceEventsSnapshot(List(), now),
70       Nil,
71       Nil,
72       LatestResourceEventsSnapshot(List(), now),
73       0L,
74       0L,
75       ActiveStateSnapshot(isActive, now),
76       CreditSnapshot(credits, now),
77       AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
78       RolesSnapshot(roleNames, now),
79       OwnedResourcesSnapshot(Nil, now),
80       Nil,
81       UserStateChangeReasonCodes.InitialCalculationCode,
82       InitialUserStateCalculation
83     )
84   }
85
86   def createInitialUserStateFrom(us: UserState): UserState = {
87     createInitialUserState(
88       us.userId,
89       us.userCreationMillis,
90       us.activeStateSnapshot.isActive,
91       us.creditsSnapshot.creditAmount,
92       us.rolesSnapshot.roles,
93       us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last
94     )
95   }
96
97   def findUserStateAtEndOfBillingMonth(userId: String,
98                                        billingMonthInfo: BillingMonthInfo,
99                                        storeProvider: StoreProvider,
100                                        currentUserState: UserState,
101                                        defaultResourcesMap: DSLResourcesMap,
102                                        accounting: Accounting,
103                                        algorithmCompiler: CostPolicyAlgorithmCompiler,
104                                        calculationReason: UserStateChangeReason,
105                                        contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = {
106
107     val clog = ContextualLogger.fromOther(
108       contextualLogger,
109       logger,
110       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
111     clog.begin()
112
113     def doCompute: Maybe[UserState] = {
114       doFullMonthlyBilling(
115         userId,
116         billingMonthInfo,
117         storeProvider,
118         currentUserState,
119         defaultResourcesMap,
120         accounting,
121         algorithmCompiler,
122         calculationReason,
123         Just(clog))
124     }
125
126     val userStateStore = storeProvider.userStateStore
127     val resourceEventStore = storeProvider.resourceEventStore
128
129     val userCreationMillis = currentUserState.userCreationMillis
130     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
131     val billingMonthStartMillis = billingMonthInfo.startMillis
132     val billingMonthStopMillis  = billingMonthInfo.stopMillis
133
134     if(billingMonthStopMillis < userCreationMillis) {
135       // If the user did not exist for this billing month, piece of cake
136       clog.debug("User did not exist before %s", userCreationDateCalc)
137
138       // NOTE: Reason here will be: InitialUserStateCalculation
139       val initialUserState0 = createInitialUserStateFrom(currentUserState)
140       val initialUserStateM = userStateStore.storeUserState2(initialUserState0)
141
142       clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM))
143       clog.end()
144
145       initialUserStateM
146     } else {
147       // Ask DB cache for the latest known user state for this billing period
148       val latestUserStateM = userStateStore.findLatestUserStateForEndOfBillingMonth(
149         userId,
150         billingMonthInfo.year,
151         billingMonthInfo.month)
152
153       latestUserStateM match {
154         case NoVal ⇒
155           // Not found, must compute
156           clog.debug("No user state found from cache, will have to (re)compute")
157           val result = doCompute
158           clog.end()
159           result
160           
161         case failed @ Failed(_, _) ⇒
162           clog.warn("Failure while quering cache for user state: %s", failed)
163           clog.end()
164           failed
165
166         case Just(latestUserState) ⇒
167           // Found a "latest" user state but need to see if it is indeed the true and one latest.
168           // For this reason, we must count the events again.
169          val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
170          val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
171            userId,
172            billingMonthStartMillis,
173            billingMonthStopMillis)
174
175          actualOOSEventsCounterM match {
176            case NoVal ⇒
177              val errMsg = "No counter computed for out of sync events. Should at least be zero."
178              clog.warn(errMsg)
179              val result = Failed(new Exception(errMsg))
180              clog.end()
181              result
182
183            case failed @ Failed(_, _) ⇒
184              clog.warn("Failure while querying for out of sync events: %s", failed)
185              clog.end()
186              failed
187
188            case Just(actualOOSEventsCounter) ⇒
189              val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
190              counterDiff match {
191                // ZERO, we are OK!
192                case 0 ⇒
193                  // NOTE: Keep the caller's calculation reason
194                  Just(latestUserState.copyForChangeReason(calculationReason))
195
196                // We had more, so must recompute
197                case n if n > 0 ⇒
198                  clog.debug(
199                    "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
200                  val result = doCompute
201                  clog.end()
202                  result
203
204                // We had less????
205                case n if n < 0 ⇒
206                  val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
207                  clog.warn(errMsg)
208                  val result = Failed(new Exception(errMsg))
209                  clog.end()
210                  result
211              }
212          }
213       }
214     }
215   }
216
217   //+ Utility methods
218   def rcDebugInfo(rcEvent: ResourceEvent) = {
219     rcEvent.toDebugString(false)
220   }
221   //- Utility methods
222
223   def processResourceEvent(startingUserState: UserState,
224                            userStateWorker: UserStateWorker,
225                            currentResourceEvent: ResourceEvent,
226                            policyStore: PolicyStore,
227                            stateChangeReason: UserStateChangeReason,
228                            billingMonthInfo: BillingMonthInfo,
229                            walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
230                            algorithmCompiler: CostPolicyAlgorithmCompiler,
231                            clogM: Maybe[ContextualLogger] = NoVal): UserState = {
232
233     val clog = ContextualLogger.fromOther(clogM, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
234
235     var _workingUserState = startingUserState
236
237     val theResource = currentResourceEvent.safeResource
238     val theInstanceId = currentResourceEvent.safeInstanceId
239     val theValue = currentResourceEvent.value
240
241     val accounting = userStateWorker.accounting
242     val resourcesMap = userStateWorker.resourcesMap
243
244     val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
245     clog.begin(currentResourceEventDebugInfo)
246
247     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
248
249     // Ignore the event if it is not billable (but still record it in the "previous" stuff).
250     // But to make this decision, first we need the resource definition (and its cost policy).
251     val dslResourceOpt = resourcesMap.findResource(theResource)
252     dslResourceOpt match {
253       // We have a resource (and thus a cost policy)
254       case Some(dslResource) ⇒
255         val costPolicy = dslResource.costPolicy
256         clog.debug("Cost policy %s for %s", costPolicy, dslResource)
257         val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
258         if(!isBillable) {
259           // The resource event is not billable
260           clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
261         } else {
262           // The resource event is billable
263           // Find the previous event.
264           // This is (potentially) needed to calculate new credit amount and new resource instance amount
265           val previousResourceEventM = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
266           clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
267
268           val havePreviousResourceEvent = previousResourceEventM.isJust
269           val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
270           if(needPreviousResourceEvent && !havePreviousResourceEvent) {
271             // This must be the first resource event of its kind, ever.
272             // TODO: We should normally check the DB to verify the claim (?)
273             clog.info("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
274             userStateWorker.updateIgnored(currentResourceEvent)
275           } else {
276             val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
277             val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
278             val oldCredits = _workingUserState.creditsSnapshot.creditAmount
279
280             // A. Compute new resource instance accumulating amount
281             val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
282
283             clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
284
285             // B. Compute new wallet entries
286             clog.debug("agreementsSnapshot = %s", _workingUserState.agreementsSnapshot)
287             val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
288
289             //              clog.debug("Computing full chargeslots")
290             val fullChargeslotsM = accounting.computeFullChargeslots(
291               previousResourceEventM,
292               currentResourceEvent,
293               oldCredits,
294               oldAmount,
295               newAmount,
296               dslResource,
297               resourcesMap,
298               alltimeAgreements,
299               algorithmCompiler,
300               policyStore,
301               Just(clog)
302             )
303
304             // We have the chargeslots, let's associate them with the current event
305             fullChargeslotsM match {
306               case Just((referenceTimeslot, fullChargeslots)) ⇒
307                 if(fullChargeslots.length == 0) {
308                   // At least one chargeslot is required.
309                   throw new Exception("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
310                 }
311                 clog.debugSeq("fullChargeslots", fullChargeslots, 0)
312
313                 // C. Compute new credit amount (based on the charge slots)
314                 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
315                 val newCredits = oldCredits - newCreditsDiff
316
317                 if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
318                   val newWalletEntry = NewWalletEntry(
319                     userStateWorker.userId,
320                     newCreditsDiff,
321                     oldCredits,
322                     newCredits,
323                     TimeHelpers.nowMillis,
324                     referenceTimeslot,
325                     billingMonthInfo.year,
326                     billingMonthInfo.month,
327                     if(havePreviousResourceEvent)
328                       List(currentResourceEvent, justForSure(previousResourceEventM).get)
329                     else
330                       List(currentResourceEvent),
331                     fullChargeslots,
332                     dslResource,
333                     currentResourceEvent.isSynthetic
334                   )
335                   clog.debug("New %s", newWalletEntry)
336
337                   walletEntriesBuffer += newWalletEntry
338                 } else {
339                   clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
340                 }
341
342                 _workingUserState = _workingUserState.copy(
343                   creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis),
344                   stateChangeCounter = _workingUserState.stateChangeCounter + 1,
345                   totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
346                 )
347
348               case NoVal ⇒
349                 // At least one chargeslot is required.
350                 throw new Exception("No chargeslots computed")
351
352               case failed@Failed(e, m) ⇒
353                 throw new Exception(m, e)
354             }
355           }
356         }
357
358         // After processing, all events billable or not update the previous state
359         userStateWorker.updatePrevious(currentResourceEvent)
360
361       // We do not have a resource (and thus, no cost policy)
362       case None ⇒
363         // Now, this is a matter of politics: what do we do if no policy was found?
364         clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
365     } // dslResourceOpt match
366
367     clog.end(currentResourceEventDebugInfo)
368
369     _workingUserState
370   }
371
372   def processResourceEvents(resourceEvents: Traversable[ResourceEvent],
373                             startingUserState: UserState,
374                             userStateWorker: UserStateWorker,
375                             policyStore: PolicyStore,
376                             stateChangeReason: UserStateChangeReason,
377                             billingMonthInfo: BillingMonthInfo,
378                             walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
379                             algorithmCompiler: CostPolicyAlgorithmCompiler,
380                             clogM: Maybe[ContextualLogger] = NoVal): UserState = {
381
382     var _workingUserState = startingUserState
383
384     for(currentResourceEvent <- resourceEvents) {
385
386       _workingUserState = processResourceEvent(
387         _workingUserState,
388         userStateWorker,
389         currentResourceEvent,
390         policyStore,
391         stateChangeReason,
392         billingMonthInfo,
393         walletEntriesBuffer,
394         algorithmCompiler,
395         clogM
396       )
397     }
398
399     _workingUserState
400   }
401
402
403   def doFullMonthlyBilling(userId: String,
404                            billingMonthInfo: BillingMonthInfo,
405                            storeProvider: StoreProvider,
406                            currentUserState: UserState,
407                            defaultResourcesMap: DSLResourcesMap,
408                            accounting: Accounting,
409                            algorithmCompiler: CostPolicyAlgorithmCompiler,
410                            calculationReason: UserStateChangeReason = NoSpecificChangeReason,
411                            contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = Maybe {
412
413
414     val clog = ContextualLogger.fromOther(
415       contextualLogger,
416       logger,
417       "doFullMonthlyBilling(%s)", billingMonthInfo)
418     clog.begin()
419
420     val clogJ = Just(clog)
421
422     val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
423       userId,
424       billingMonthInfo.previousMonth,
425       storeProvider,
426       currentUserState,
427       defaultResourcesMap,
428       accounting,
429       algorithmCompiler,
430       calculationReason.forPreviousBillingMonth,
431       clogJ
432     )
433     
434     if(previousBillingMonthUserStateM.isNoVal) {
435       throw new Exception("Could not calculate initial user state for billing %s".format(billingMonthInfo))
436     }
437     if(previousBillingMonthUserStateM.isFailed) {
438       throw failedForSure(previousBillingMonthUserStateM).exception
439     }
440
441     val startingUserState = justForSure(previousBillingMonthUserStateM).get
442
443     val userStateStore = storeProvider.userStateStore
444     val resourceEventStore = storeProvider.resourceEventStore
445     val policyStore = storeProvider.policyStore
446
447     val billingMonthStartMillis = billingMonthInfo.startMillis
448     val billingMonthEndMillis = billingMonthInfo.stopMillis
449
450     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
451     // specified in the parameters.
452     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
453     var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
454
455     val userStateWorker = UserStateWorker.fromUserState(_workingUserState, accounting, defaultResourcesMap)
456
457     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
458
459     // First, find and process the actual resource events from DB
460     val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
461       userId,
462       billingMonthStartMillis,
463       billingMonthEndMillis)
464
465     val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
466
467     _workingUserState = processResourceEvents(
468       allResourceEventsForMonth,
469       _workingUserState,
470       userStateWorker,
471       policyStore,
472       calculationReason,
473       billingMonthInfo,
474       newWalletEntries,
475       algorithmCompiler,
476       clogJ
477     )
478
479     // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
480     // ... in order to generate an implicit ON later
481     val (specialEvents, theirImplicitEnds) = userStateWorker.
482       findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
483     if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
484       clog.debug("")
485       clog.debug("Process implicitly issued events")
486       clog.debugSeq("specialEvents", specialEvents, 0)
487       clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
488     }
489
490     // Now, the previous and implicitly started must be our base for the following computation, so we create an
491     // appropriate worker
492     val specialUserStateWorker = UserStateWorker(
493       userStateWorker.userId,
494       LatestResourceEventsWorker.fromList(specialEvents),
495       ImplicitlyIssuedResourceEventsWorker.Empty,
496       IgnoredFirstResourceEventsWorker.Empty,
497       userStateWorker.accounting,
498       userStateWorker.resourcesMap
499     )
500
501     _workingUserState = processResourceEvents(
502       theirImplicitEnds,
503       _workingUserState,
504       specialUserStateWorker,
505       policyStore,
506       calculationReason,
507       billingMonthInfo,
508       newWalletEntries,
509       algorithmCompiler,
510       clogJ
511     )
512
513     val lastUpdateTime = TimeHelpers.nowMillis
514
515     _workingUserState = _workingUserState.copy(
516       implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
517       latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
518       stateChangeCounter = _workingUserState.stateChangeCounter + 1,
519       parentUserStateId = startingUserState.idOpt,
520       newWalletEntries = newWalletEntries.toList
521     )
522
523     clog.debug("calculationReason = %s", calculationReason)
524
525     if(calculationReason.shouldStoreUserState) {
526       val storedUserStateM = userStateStore.storeUserState2(_workingUserState)
527       storedUserStateM match {
528         case Just(storedUserState) ⇒
529           clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState)
530           _workingUserState = storedUserState
531         case NoVal ⇒
532           clog.warn("Could not store %s", _workingUserState)
533         case failed @ Failed(e, m) ⇒
534           clog.error(e, "Could not store %s", _workingUserState)
535       }
536     }
537
538     clog.debug("RETURN %s", _workingUserState)
539     clog.end()
540     _workingUserState
541   }
542 }
543
544 /**
545  * A helper object holding intermediate state/results during resource event processing.
546  *
547  * @param previousResourceEvents
548  *          This is a collection of all the latest resource events.
549  *          We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
550  *          ones. Will be updated on processing the next resource event.
551  *
552  * @param implicitlyIssuedStartEvents
553  *          The implicitly issued resource events at the beginning of the billing period.
554  *
555  * @param ignoredFirstResourceEvents
556  *          The resource events that were first (and unused) of their kind.
557  *
558  * @author Christos KK Loverdos <loverdos@gmail.com>
559  */
560 case class UserStateWorker(userId: String,
561                            previousResourceEvents: LatestResourceEventsWorker,
562                            implicitlyIssuedStartEvents: ImplicitlyIssuedResourceEventsWorker,
563                            ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker,
564                            accounting: Accounting,
565                            resourcesMap: DSLResourcesMap) {
566
567   /**
568    * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource
569    * events and b) the explicit previous resource events. If the event is found, it is removed from the
570    * respective source.
571    *
572    * If the event is not found, then this must be for a new resource instance.
573    * (and probably then some `zero` resource event must be implied as the previous one)
574    *
575    * @param resource
576    * @param instanceId
577    * @return
578    */
579   def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = {
580     // implicitly issued events are checked first
581     implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
582       case just @ Just(_) ⇒
583         just
584       case NoVal ⇒
585         // explicit previous resource events are checked second
586         previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
587           case just @ Just(_) ⇒
588             just
589           case noValOrFailed ⇒
590             noValOrFailed
591         }
592       case failed ⇒
593         failed
594     }
595   }
596
597   def updateIgnored(resourceEvent: ResourceEvent): Unit = {
598     ignoredFirstResourceEvents.updateResourceEvent(resourceEvent)
599   }
600
601   def updatePrevious(resourceEvent: ResourceEvent): Unit = {
602     previousResourceEvents.updateResourceEvent(resourceEvent)
603   }
604
605   def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEvent ⇒ String): Unit = {
606     if(previousResourceEvents.size > 0) {
607       val map = previousResourceEvents.latestEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
608       clog.debugMap("previousResourceEvents", map, 0)
609     }
610     if(implicitlyIssuedStartEvents.size > 0) {
611       val map = implicitlyIssuedStartEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
612       clog.debugMap("implicitlyTerminatedResourceEvents", map, 0)
613     }
614     if(ignoredFirstResourceEvents.size > 0) {
615       val map = ignoredFirstResourceEvents.ignoredFirstEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
616       clog.debugMap("ignoredFirstResourceEvents", map, 0)
617     }
618   }
619
620 //  private[this]
621 //  def allPreviousAndAllImplicitlyStarted: List[ResourceEvent] = {
622 //    val buffer: FullMutableResourceTypeMap = scala.collection.mutable.Map[FullResourceType, ResourceEvent]()
623 //
624 //    buffer ++= implicitlyIssuedStartEvents.implicitlyIssuedEventsMap
625 //    buffer ++= previousResourceEvents.latestEventsMap
626 //
627 //    buffer.valuesIterator.toList
628 //  }
629
630   /**
631    * Find those events from `implicitlyIssuedStartEvents` and `previousResourceEvents` that will generate implicit
632    * end events along with those implicitly issued events. Before returning, remove the events that generated the
633    * implicit ends from the internal state of this instance.
634    *
635    * @see [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
636    */
637   def findAndRemoveGeneratorsOfImplicitEndEvents(newOccuredMillis: Long
638                                                 ): (List[ResourceEvent], List[ResourceEvent]) = {
639     val buffer = mutable.ListBuffer[(ResourceEvent, ResourceEvent)]()
640     val checkSet = mutable.Set[ResourceEvent]()
641     
642     def doItFor(map: ResourceEvent.FullMutableResourceTypeMap): Unit = {
643       val resourceEvents = map.valuesIterator
644       for {
645         resourceEvent <- resourceEvents
646         dslResource   <- resourcesMap.findResource(resourceEvent.safeResource)
647         costPolicy    =  dslResource.costPolicy
648       } {
649         if(costPolicy.supportsImplicitEvents) {
650           if(costPolicy.mustConstructImplicitEndEventFor(resourceEvent)) {
651             val implicitEnd = costPolicy.constructImplicitEndEventFor(resourceEvent, newOccuredMillis)
652             
653             if(!checkSet.contains(resourceEvent)) {
654               checkSet.add(resourceEvent)
655               buffer append ((resourceEvent, implicitEnd))
656             }
657
658             // remove it anyway
659             map.remove((resourceEvent.safeResource, resourceEvent.safeInstanceId))
660           }
661         }
662       }
663     }
664
665     doItFor(previousResourceEvents.latestEventsMap)                // we give priority for previous
666     doItFor(implicitlyIssuedStartEvents.implicitlyIssuedEventsMap) // ... over implicitly issued...
667
668     (buffer.view.map(_._1).toList, buffer.view.map(_._2).toList)
669   }
670 }
671
672 object UserStateWorker {
673   def fromUserState(userState: UserState, accounting: Accounting, resourcesMap: DSLResourcesMap): UserStateWorker = {
674     UserStateWorker(
675       userState.userId,
676       userState.latestResourceEventsSnapshot.toMutableWorker,
677       userState.implicitlyIssuedSnapshot.toMutableWorker,
678       IgnoredFirstResourceEventsWorker.Empty,
679       accounting,
680       resourcesMap
681     )
682   }
683 }