02cc0ca8439e52d47ec45627264dd4797a660d8f
[aquarium] / src / main / scala / gr / grnet / aquarium / user / UserStateComputations.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.user
37
38
39 import scala.collection.mutable
40 import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
41 import gr.grnet.aquarium.util.{ContextualLogger, Loggable, justForSure, failedForSure}
42 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
43 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap}
44 import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
45 import gr.grnet.aquarium.logic.accounting.Accounting
46 import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
47 import gr.grnet.aquarium.AquariumException
48 import gr.grnet.aquarium.events.{NewWalletEntry, ResourceEvent}
49
50 /**
51  *
52  * @author Christos KK Loverdos <loverdos@gmail.com>
53  */
54 class UserStateComputations extends Loggable {
55   def createInitialUserState(userId: String,
56                              userCreationMillis: Long,
57                              isActive: Boolean,
58                              credits: Double,
59                              roleNames: List[String] = List(),
60                              agreementName: String = DSLAgreement.DefaultAgreementName) = {
61     val now = userCreationMillis
62
63     UserState(
64       userId,
65       userCreationMillis,
66       0L,
67       false,
68       null,
69       ImplicitlyIssuedResourceEventsSnapshot(List(), now),
70       Nil,
71       Nil,
72       LatestResourceEventsSnapshot(List(), now),
73       0L,
74       0L,
75       ActiveStateSnapshot(isActive, now),
76       CreditSnapshot(credits, now),
77       AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
78       RolesSnapshot(roleNames, now),
79       OwnedResourcesSnapshot(Nil, now),
80       Nil,
81       UserStateChangeReasonCodes.InitialCalculationCode,
82       InitialUserStateCalculation
83     )
84   }
85
86   def createInitialUserStateFrom(us: UserState): UserState = {
87     createInitialUserState(
88       us.userId,
89       us.userCreationMillis,
90       us.activeStateSnapshot.isActive,
91       us.creditsSnapshot.creditAmount,
92       us.rolesSnapshot.roles,
93       us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last
94     )
95   }
96
97   def findUserStateAtEndOfBillingMonth(userId: String,
98                                        billingMonthInfo: BillingMonthInfo,
99                                        storeProvider: StoreProvider,
100                                        currentUserState: UserState,
101                                        defaultResourcesMap: DSLResourcesMap,
102                                        accounting: Accounting,
103                                        algorithmCompiler: CostPolicyAlgorithmCompiler,
104                                        calculationReason: UserStateChangeReason,
105                                        contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = {
106
107     val clog = ContextualLogger.fromOther(
108       contextualLogger,
109       logger,
110       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
111     clog.begin()
112
113     def doCompute: Maybe[UserState] = {
114       doFullMonthlyBilling(
115         userId,
116         billingMonthInfo,
117         storeProvider,
118         currentUserState,
119         defaultResourcesMap,
120         accounting,
121         algorithmCompiler,
122         calculationReason,
123         Just(clog))
124     }
125
126     val userStateStore = storeProvider.userStateStore
127     val resourceEventStore = storeProvider.resourceEventStore
128
129     val userCreationMillis = currentUserState.userCreationMillis
130     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
131     val billingMonthStartMillis = billingMonthInfo.startMillis
132     val billingMonthStopMillis  = billingMonthInfo.stopMillis
133
134     if(billingMonthStopMillis < userCreationMillis) {
135       // If the user did not exist for this billing month, piece of cake
136       clog.debug("User did not exist before %s", userCreationDateCalc)
137
138       // NOTE: Reason here will be: InitialUserStateCalculation
139       val initialUserState0 = createInitialUserStateFrom(currentUserState)
140       val initialUserStateM = userStateStore.storeUserState2(initialUserState0)
141
142       clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM))
143       clog.end()
144
145       initialUserStateM
146     } else {
147       // Ask DB cache for the latest known user state for this billing period
148       val latestUserStateM = userStateStore.findLatestUserStateForEndOfBillingMonth(
149         userId,
150         billingMonthInfo.year,
151         billingMonthInfo.month)
152
153       latestUserStateM match {
154         case NoVal ⇒
155           // Not found, must compute
156           clog.debug("No user state found from cache, will have to (re)compute")
157           val result = doCompute
158           clog.end()
159           result
160
161         case failed @ Failed(e) ⇒
162           clog.warn("Failure while quering cache for user state: %s", failed)
163           clog.end()
164           failed
165
166         case Just(latestUserState) ⇒
167           // Found a "latest" user state but need to see if it is indeed the true and one latest.
168           // For this reason, we must count the events again.
169          val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
170          val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
171            userId,
172            billingMonthStartMillis,
173            billingMonthStopMillis)
174
175          actualOOSEventsCounterM match {
176            case NoVal ⇒
177              val errMsg = "No counter computed for out of sync events. Should at least be zero."
178              clog.warn(errMsg)
179              val result = Failed(new AquariumException(errMsg))
180              clog.end()
181              result
182
183            case failed @ Failed(_) ⇒
184              clog.warn("Failure while querying for out of sync events: %s", failed)
185              clog.end()
186              failed
187
188            case Just(actualOOSEventsCounter) ⇒
189              val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
190              counterDiff match {
191                // ZERO, we are OK!
192                case 0 ⇒
193                  // NOTE: Keep the caller's calculation reason
194                  Just(latestUserState.copyForChangeReason(calculationReason))
195
196                // We had more, so must recompute
197                case n if n > 0 ⇒
198                  clog.debug(
199                    "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
200                  val result = doCompute
201                  clog.end()
202                  result
203
204                // We had less????
205                case n if n < 0 ⇒
206                  val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
207                  clog.warn(errMsg)
208                  val result = Failed(new AquariumException(errMsg))
209                  clog.end()
210                  result
211              }
212          }
213       }
214     }
215   }
216
217   //+ Utility methods
218   def rcDebugInfo(rcEvent: ResourceEvent) = {
219     rcEvent.toDebugString(false)
220   }
221   //- Utility methods
222
223   def processResourceEvent(startingUserState: UserState,
224                            userStateWorker: UserStateWorker,
225                            currentResourceEvent: ResourceEvent,
226                            policyStore: PolicyStore,
227                            stateChangeReason: UserStateChangeReason,
228                            billingMonthInfo: BillingMonthInfo,
229                            walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
230                            algorithmCompiler: CostPolicyAlgorithmCompiler,
231                            clogM: Maybe[ContextualLogger] = NoVal): UserState = {
232
233     val clog = ContextualLogger.fromOther(clogM, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
234
235     var _workingUserState = startingUserState
236
237     val theResource = currentResourceEvent.safeResource
238     val theInstanceId = currentResourceEvent.safeInstanceId
239     val theValue = currentResourceEvent.value
240
241     val accounting = userStateWorker.accounting
242     val resourcesMap = userStateWorker.resourcesMap
243
244     val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
245     clog.begin(currentResourceEventDebugInfo)
246
247     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
248
249     // Ignore the event if it is not billable (but still record it in the "previous" stuff).
250     // But to make this decision, first we need the resource definition (and its cost policy).
251     val dslResourceOpt = resourcesMap.findResource(theResource)
252     dslResourceOpt match {
253       // We have a resource (and thus a cost policy)
254       case Some(dslResource) ⇒
255         val costPolicy = dslResource.costPolicy
256         clog.debug("Cost policy %s for %s", costPolicy, dslResource)
257         val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
258         if(!isBillable) {
259           // The resource event is not billable
260           clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
261         } else {
262           // The resource event is billable
263           // Find the previous event.
264           // This is (potentially) needed to calculate new credit amount and new resource instance amount
265           val previousResourceEventM = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
266           clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
267
268           val havePreviousResourceEvent = previousResourceEventM.isJust
269           val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
270           if(needPreviousResourceEvent && !havePreviousResourceEvent) {
271             // This must be the first resource event of its kind, ever.
272             // TODO: We should normally check the DB to verify the claim (?)
273             clog.info("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
274             userStateWorker.updateIgnored(currentResourceEvent)
275           } else {
276             val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
277             val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
278             val oldCredits = _workingUserState.creditsSnapshot.creditAmount
279
280             // A. Compute new resource instance accumulating amount
281             val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
282
283             clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
284
285             // B. Compute new wallet entries
286             clog.debug("agreementsSnapshot = %s", _workingUserState.agreementsSnapshot)
287             val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
288
289             //              clog.debug("Computing full chargeslots")
290             val fullChargeslotsM = accounting.computeFullChargeslots(
291               previousResourceEventM,
292               currentResourceEvent,
293               oldCredits,
294               oldAmount,
295               newAmount,
296               dslResource,
297               resourcesMap,
298               alltimeAgreements,
299               algorithmCompiler,
300               policyStore,
301               Just(clog)
302             )
303
304             // We have the chargeslots, let's associate them with the current event
305             fullChargeslotsM match {
306               case Just((referenceTimeslot, fullChargeslots)) ⇒
307                 if(fullChargeslots.length == 0) {
308                   // At least one chargeslot is required.
309                   throw new AquariumException("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
310                 }
311                 clog.debugSeq("fullChargeslots", fullChargeslots, 0)
312
313                 // C. Compute new credit amount (based on the charge slots)
314                 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
315                 val newCredits = oldCredits - newCreditsDiff
316
317                 if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
318                   val newWalletEntry = NewWalletEntry(
319                     userStateWorker.userId,
320                     newCreditsDiff,
321                     oldCredits,
322                     newCredits,
323                     TimeHelpers.nowMillis(),
324                     referenceTimeslot,
325                     billingMonthInfo.year,
326                     billingMonthInfo.month,
327                     if(havePreviousResourceEvent)
328                       List(currentResourceEvent, justForSure(previousResourceEventM).get)
329                     else
330                       List(currentResourceEvent),
331                     fullChargeslots,
332                     dslResource,
333                     currentResourceEvent.isSynthetic
334                   )
335                   clog.debug("New %s", newWalletEntry)
336
337                   walletEntriesBuffer += newWalletEntry
338                 } else {
339                   clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
340                 }
341
342                 _workingUserState = _workingUserState.copy(
343                   creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
344                   stateChangeCounter = _workingUserState.stateChangeCounter + 1,
345                   totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
346                 )
347
348               case NoVal ⇒
349                 // At least one chargeslot is required.
350                 throw new AquariumException("No chargeslots computed")
351
352               case failed@Failed(e) ⇒
353                 throw new AquariumException(e, "Error computing chargeslots")
354             }
355           }
356         }
357
358         // After processing, all events billable or not update the previous state
359         userStateWorker.updatePrevious(currentResourceEvent)
360
361         _workingUserState = _workingUserState.copy(
362           latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
363         )
364
365       // We do not have a resource (and thus, no cost policy)
366       case None ⇒
367         // Now, this is a matter of politics: what do we do if no policy was found?
368         clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
369     } // dslResourceOpt match
370
371     clog.end(currentResourceEventDebugInfo)
372
373     _workingUserState
374   }
375
376   def processResourceEvents(resourceEvents: Traversable[ResourceEvent],
377                             startingUserState: UserState,
378                             userStateWorker: UserStateWorker,
379                             policyStore: PolicyStore,
380                             stateChangeReason: UserStateChangeReason,
381                             billingMonthInfo: BillingMonthInfo,
382                             walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
383                             algorithmCompiler: CostPolicyAlgorithmCompiler,
384                             clogM: Maybe[ContextualLogger] = NoVal): UserState = {
385
386     var _workingUserState = startingUserState
387
388     for(currentResourceEvent <- resourceEvents) {
389
390       _workingUserState = processResourceEvent(
391         _workingUserState,
392         userStateWorker,
393         currentResourceEvent,
394         policyStore,
395         stateChangeReason,
396         billingMonthInfo,
397         walletEntriesBuffer,
398         algorithmCompiler,
399         clogM
400       )
401     }
402
403     _workingUserState
404   }
405
406
407   def doFullMonthlyBilling(userId: String,
408                            billingMonthInfo: BillingMonthInfo,
409                            storeProvider: StoreProvider,
410                            currentUserState: UserState,
411                            defaultResourcesMap: DSLResourcesMap,
412                            accounting: Accounting,
413                            algorithmCompiler: CostPolicyAlgorithmCompiler,
414                            calculationReason: UserStateChangeReason = NoSpecificChangeReason,
415                            contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = Maybe {
416
417
418     val clog = ContextualLogger.fromOther(
419       contextualLogger,
420       logger,
421       "doFullMonthlyBilling(%s)", billingMonthInfo)
422     clog.begin()
423
424     val clogJ = Just(clog)
425
426     val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
427       userId,
428       billingMonthInfo.previousMonth,
429       storeProvider,
430       currentUserState,
431       defaultResourcesMap,
432       accounting,
433       algorithmCompiler,
434       calculationReason.forPreviousBillingMonth,
435       clogJ
436     )
437
438     if(previousBillingMonthUserStateM.isNoVal) {
439       throw new AquariumException("Could not calculate initial user state for billing %s".format(billingMonthInfo))
440     }
441     if(previousBillingMonthUserStateM.isFailed) {
442       throw failedForSure(previousBillingMonthUserStateM).exception
443     }
444
445     val startingUserState = justForSure(previousBillingMonthUserStateM).get
446
447     val userStateStore = storeProvider.userStateStore
448     val resourceEventStore = storeProvider.resourceEventStore
449     val policyStore = storeProvider.policyStore
450
451     val billingMonthStartMillis = billingMonthInfo.startMillis
452     val billingMonthEndMillis = billingMonthInfo.stopMillis
453
454     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
455     // specified in the parameters.
456     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
457     var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
458
459     val userStateWorker = UserStateWorker.fromUserState(_workingUserState, accounting, defaultResourcesMap)
460
461     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
462
463     // First, find and process the actual resource events from DB
464     val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
465       userId,
466       billingMonthStartMillis,
467       billingMonthEndMillis)
468
469     val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
470
471     _workingUserState = processResourceEvents(
472       allResourceEventsForMonth,
473       _workingUserState,
474       userStateWorker,
475       policyStore,
476       calculationReason,
477       billingMonthInfo,
478       newWalletEntries,
479       algorithmCompiler,
480       clogJ
481     )
482
483     // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
484     // ... in order to generate an implicit ON later
485     val (specialEvents, theirImplicitEnds) = userStateWorker.
486       findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
487     if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
488       clog.debug("")
489       clog.debug("Process implicitly issued events")
490       clog.debugSeq("specialEvents", specialEvents, 0)
491       clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
492     }
493
494     // Now, the previous and implicitly started must be our base for the following computation, so we create an
495     // appropriate worker
496     val specialUserStateWorker = UserStateWorker(
497       userStateWorker.userId,
498       LatestResourceEventsWorker.fromList(specialEvents),
499       ImplicitlyIssuedResourceEventsWorker.Empty,
500       IgnoredFirstResourceEventsWorker.Empty,
501       userStateWorker.accounting,
502       userStateWorker.resourcesMap
503     )
504
505     _workingUserState = processResourceEvents(
506       theirImplicitEnds,
507       _workingUserState,
508       specialUserStateWorker,
509       policyStore,
510       calculationReason,
511       billingMonthInfo,
512       newWalletEntries,
513       algorithmCompiler,
514       clogJ
515     )
516
517     val lastUpdateTime = TimeHelpers.nowMillis()
518
519     _workingUserState = _workingUserState.copy(
520       implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
521       latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
522       stateChangeCounter = _workingUserState.stateChangeCounter + 1,
523       parentUserStateId = startingUserState.idOpt,
524       newWalletEntries = newWalletEntries.toList
525     )
526
527     clog.debug("calculationReason = %s", calculationReason)
528
529     if(calculationReason.shouldStoreUserState) {
530       val storedUserStateM = userStateStore.storeUserState2(_workingUserState)
531       storedUserStateM match {
532         case Just(storedUserState) ⇒
533           clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState)
534           _workingUserState = storedUserState
535         case NoVal ⇒
536           clog.warn("Could not store %s", _workingUserState)
537         case failed @ Failed(e) ⇒
538           clog.error(e, "Could not store %s", _workingUserState)
539       }
540     }
541
542     clog.debug("RETURN %s", _workingUserState)
543     clog.end()
544     _workingUserState
545   }
546 }
547
548 /**
549  * A helper object holding intermediate state/results during resource event processing.
550  *
551  * @param previousResourceEvents
552  *          This is a collection of all the latest resource events.
553  *          We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
554  *          ones. Will be updated on processing the next resource event.
555  *
556  * @param implicitlyIssuedStartEvents
557  *          The implicitly issued resource events at the beginning of the billing period.
558  *
559  * @param ignoredFirstResourceEvents
560  *          The resource events that were first (and unused) of their kind.
561  *
562  * @author Christos KK Loverdos <loverdos@gmail.com>
563  */
564 case class UserStateWorker(userId: String,
565                            previousResourceEvents: LatestResourceEventsWorker,
566                            implicitlyIssuedStartEvents: ImplicitlyIssuedResourceEventsWorker,
567                            ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker,
568                            accounting: Accounting,
569                            resourcesMap: DSLResourcesMap) {
570
571   /**
572    * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource
573    * events and b) the explicit previous resource events. If the event is found, it is removed from the
574    * respective source.
575    *
576    * If the event is not found, then this must be for a new resource instance.
577    * (and probably then some `zero` resource event must be implied as the previous one)
578    *
579    * @param resource
580    * @param instanceId
581    * @return
582    */
583   def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = {
584     // implicitly issued events are checked first
585     implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
586       case just @ Just(_) ⇒
587         just
588       case NoVal ⇒
589         // explicit previous resource events are checked second
590         previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
591           case just @ Just(_) ⇒
592             just
593           case noValOrFailed ⇒
594             noValOrFailed
595         }
596       case failed ⇒
597         failed
598     }
599   }
600
601   def updateIgnored(resourceEvent: ResourceEvent): Unit = {
602     ignoredFirstResourceEvents.updateResourceEvent(resourceEvent)
603   }
604
605   def updatePrevious(resourceEvent: ResourceEvent): Unit = {
606     previousResourceEvents.updateResourceEvent(resourceEvent)
607   }
608
609   def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEvent ⇒ String): Unit = {
610     if(previousResourceEvents.size > 0) {
611       val map = previousResourceEvents.latestEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
612       clog.debugMap("previousResourceEvents", map, 0)
613     }
614     if(implicitlyIssuedStartEvents.size > 0) {
615       val map = implicitlyIssuedStartEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
616       clog.debugMap("implicitlyTerminatedResourceEvents", map, 0)
617     }
618     if(ignoredFirstResourceEvents.size > 0) {
619       val map = ignoredFirstResourceEvents.ignoredFirstEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
620       clog.debugMap("ignoredFirstResourceEvents", map, 0)
621     }
622   }
623
624 //  private[this]
625 //  def allPreviousAndAllImplicitlyStarted: List[ResourceEvent] = {
626 //    val buffer: FullMutableResourceTypeMap = scala.collection.mutable.Map[FullResourceType, ResourceEvent]()
627 //
628 //    buffer ++= implicitlyIssuedStartEvents.implicitlyIssuedEventsMap
629 //    buffer ++= previousResourceEvents.latestEventsMap
630 //
631 //    buffer.valuesIterator.toList
632 //  }
633
634   /**
635    * Find those events from `implicitlyIssuedStartEvents` and `previousResourceEvents` that will generate implicit
636    * end events along with those implicitly issued events. Before returning, remove the events that generated the
637    * implicit ends from the internal state of this instance.
638    *
639    * @see [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
640    */
641   def findAndRemoveGeneratorsOfImplicitEndEvents(newOccuredMillis: Long
642                                                 ): (List[ResourceEvent], List[ResourceEvent]) = {
643     val buffer = mutable.ListBuffer[(ResourceEvent, ResourceEvent)]()
644     val checkSet = mutable.Set[ResourceEvent]()
645
646     def doItFor(map: ResourceEvent.FullMutableResourceTypeMap): Unit = {
647       val resourceEvents = map.valuesIterator
648       for {
649         resourceEvent <- resourceEvents
650         dslResource   <- resourcesMap.findResource(resourceEvent.safeResource)
651         costPolicy    =  dslResource.costPolicy
652       } {
653         if(costPolicy.supportsImplicitEvents) {
654           if(costPolicy.mustConstructImplicitEndEventFor(resourceEvent)) {
655             val implicitEnd = costPolicy.constructImplicitEndEventFor(resourceEvent, newOccuredMillis)
656
657             if(!checkSet.contains(resourceEvent)) {
658               checkSet.add(resourceEvent)
659               buffer append ((resourceEvent, implicitEnd))
660             }
661
662             // remove it anyway
663             map.remove((resourceEvent.safeResource, resourceEvent.safeInstanceId))
664           }
665         }
666       }
667     }
668
669     doItFor(previousResourceEvents.latestEventsMap)                // we give priority for previous
670     doItFor(implicitlyIssuedStartEvents.implicitlyIssuedEventsMap) // ... over implicitly issued...
671
672     (buffer.view.map(_._1).toList, buffer.view.map(_._2).toList)
673   }
674 }
675
676 object UserStateWorker {
677   def fromUserState(userState: UserState, accounting: Accounting, resourcesMap: DSLResourcesMap): UserStateWorker = {
678     UserStateWorker(
679       userState.userId,
680       userState.latestResourceEventsSnapshot.toMutableWorker,
681       userState.implicitlyIssuedSnapshot.toMutableWorker,
682       IgnoredFirstResourceEventsWorker.Empty,
683       accounting,
684       resourcesMap
685     )
686   }
687 }