ec34ac6eb7429d4e88a011a17625b20b234c1d5b
[aquarium] / src / main / scala / gr / grnet / aquarium / user / UserStateComputations.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.user
37
38
39 import scala.collection.mutable
40 import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
41 import gr.grnet.aquarium.util.{ContextualLogger, Loggable, justForSure, failedForSure}
42 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
43 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap}
44 import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
45 import gr.grnet.aquarium.logic.accounting.Accounting
46 import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
47 import gr.grnet.aquarium.event.{NewWalletEntry}
48 import gr.grnet.aquarium.event.resource.ResourceEventModel
49 import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent}
50 import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion.User
51 import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
52
53 /**
54  *
55  * @author Christos KK Loverdos <loverdos@gmail.com>
56  */
57 class UserStateComputations extends Loggable {
58   def createInitialUserState(imEvent: IMEventModel, credits: Double, agreementName: String) = {
59     if(!imEvent.isCreateUser) {
60       throw new AquariumInternalError(
61         "Got '%s' instead of '%s'".format(imEvent.eventType, IMEventModel.EventTypeNames.create))
62     }
63
64     val userID = imEvent.userID
65     val userCreationMillis = imEvent.occurredMillis
66     val now = TimeHelpers.nowMillis()
67
68     UserState(
69       true,
70       userID,
71       userCreationMillis,
72       0L,
73       false,
74       null,
75       ImplicitlyIssuedResourceEventsSnapshot(List(), now),
76       Nil,
77       Nil,
78       LatestResourceEventsSnapshot(List(), now),
79       0L,
80       0L,
81       IMStateSnapshot(imEvent, now),
82       CreditSnapshot(credits, now),
83       AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
84       OwnedResourcesSnapshot(Nil, now),
85       Nil,
86       InitialUserStateSetup
87     )
88   }
89
90   def createInitialUserState(userID: String,
91                              userCreationMillis: Long,
92                              isActive: Boolean,
93                              credits: Double,
94                              roleNames: List[String] = List(),
95                              agreementName: String = DSLAgreement.DefaultAgreementName) = {
96     val now = userCreationMillis
97
98     UserState(
99       true,
100       userID,
101       userCreationMillis,
102       0L,
103       false,
104       null,
105       ImplicitlyIssuedResourceEventsSnapshot(List(), now),
106       Nil,
107       Nil,
108       LatestResourceEventsSnapshot(List(), now),
109       0L,
110       0L,
111       IMStateSnapshot(
112         StdIMEvent(
113           "",
114           now, now, userID,
115           "",
116           isActive, roleNames.headOption.getOrElse("default"),
117           "1.0",
118           IMEventModel.EventTypeNames.create, Map()),
119         now
120       ),
121       CreditSnapshot(credits, now),
122       AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
123       OwnedResourcesSnapshot(Nil, now),
124       Nil,
125       InitialUserStateSetup
126     )
127   }
128
129   def createInitialUserStateFrom(us: UserState): UserState = {
130     createInitialUserState(
131       us.imStateSnapshot.imEvent,
132       us.creditsSnapshot.creditAmount,
133       us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last)
134   }
135
136   def findUserStateAtEndOfBillingMonth(userId: String,
137                                        billingMonthInfo: BillingMonthInfo,
138                                        storeProvider: StoreProvider,
139                                        currentUserState: UserState,
140                                        defaultResourcesMap: DSLResourcesMap,
141                                        accounting: Accounting,
142                                        algorithmCompiler: CostPolicyAlgorithmCompiler,
143                                        calculationReason: UserStateChangeReason,
144                                        clogOpt: Option[ContextualLogger] = None): Maybe[UserState] = {
145
146     val clog = ContextualLogger.fromOther(
147       clogOpt,
148       logger,
149       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
150     clog.begin()
151
152     def doCompute: Maybe[UserState] = {
153       doFullMonthlyBilling(
154         userId,
155         billingMonthInfo,
156         storeProvider,
157         currentUserState,
158         defaultResourcesMap,
159         accounting,
160         algorithmCompiler,
161         calculationReason,
162         Some(clog))
163     }
164
165     val userStateStore = storeProvider.userStateStore
166     val resourceEventStore = storeProvider.resourceEventStore
167
168     val userCreationMillis = currentUserState.userCreationMillis
169     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
170     val billingMonthStartMillis = billingMonthInfo.startMillis
171     val billingMonthStopMillis  = billingMonthInfo.stopMillis
172
173     if(billingMonthStopMillis < userCreationMillis) {
174       // If the user did not exist for this billing month, piece of cake
175       clog.debug("User did not exist before %s", userCreationDateCalc)
176
177       // NOTE: Reason here will be: InitialUserStateSetup$
178       val initialUserState0 = createInitialUserStateFrom(currentUserState)
179       val initialUserStateM = userStateStore.insertUserState2(initialUserState0)
180
181       clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM))
182       clog.end()
183
184       initialUserStateM
185     } else {
186       // Ask DB cache for the latest known user state for this billing period
187       val latestUserStateM = Maybe { userStateStore.findLatestUserStateForEndOfBillingMonth(
188         userId,
189         billingMonthInfo.year,
190         billingMonthInfo.month) match {
191
192         case Some(latestUserState) ⇒
193           latestUserState
194         case None ⇒
195           null
196       }}
197
198       latestUserStateM match {
199         case NoVal ⇒
200           // Not found, must compute
201           clog.debug("No user state found from cache, will have to (re)compute")
202           val result = doCompute
203           clog.end()
204           result
205
206         case failed @ Failed(e) ⇒
207           clog.warn("Failure while quering cache for user state: %s", failed)
208           clog.end()
209           failed
210
211         case Just(latestUserState) ⇒
212           // Found a "latest" user state but need to see if it is indeed the true and one latest.
213           // For this reason, we must count the events again.
214          val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
215          val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
216            userId,
217            billingMonthStartMillis,
218            billingMonthStopMillis)
219
220          actualOOSEventsCounterM match {
221            case NoVal ⇒
222              val errMsg = "No counter computed for out of sync events. Should at least be zero."
223              clog.warn(errMsg)
224              val result = Failed(new AquariumException(errMsg))
225              clog.end()
226              result
227
228            case failed @ Failed(_) ⇒
229              clog.warn("Failure while querying for out of sync events: %s", failed)
230              clog.end()
231              failed
232
233            case Just(actualOOSEventsCounter) ⇒
234              val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
235              counterDiff match {
236                // ZERO, we are OK!
237                case 0 ⇒
238                  // NOTE: Keep the caller's calculation reason
239                  Just(latestUserState.copyForChangeReason(calculationReason))
240
241                // We had more, so must recompute
242                case n if n > 0 ⇒
243                  clog.debug(
244                    "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
245                  val result = doCompute
246                  clog.end()
247                  result
248
249                // We had less????
250                case n if n < 0 ⇒
251                  val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
252                  clog.warn(errMsg)
253                  val result = Failed(new AquariumException(errMsg))
254                  clog.end()
255                  result
256              }
257          }
258       }
259     }
260   }
261
262   //+ Utility methods
263   def rcDebugInfo(rcEvent: ResourceEventModel) = {
264     rcEvent.toDebugString(false)
265   }
266   //- Utility methods
267
268   def processResourceEvent(startingUserState: UserState,
269                            userStateWorker: UserStateWorker,
270                            currentResourceEvent: ResourceEventModel,
271                            policyStore: PolicyStore,
272                            stateChangeReason: UserStateChangeReason,
273                            billingMonthInfo: BillingMonthInfo,
274                            walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
275                            algorithmCompiler: CostPolicyAlgorithmCompiler,
276                            clogOpt: Option[ContextualLogger] = None): UserState = {
277
278     val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
279
280     var _workingUserState = startingUserState
281
282     val theResource = currentResourceEvent.safeResource
283     val theInstanceId = currentResourceEvent.safeInstanceId
284     val theValue = currentResourceEvent.value
285
286     val accounting = userStateWorker.accounting
287     val resourcesMap = userStateWorker.resourcesMap
288
289     val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
290     clog.begin(currentResourceEventDebugInfo)
291
292     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
293
294     // Ignore the event if it is not billable (but still record it in the "previous" stuff).
295     // But to make this decision, first we need the resource definition (and its cost policy).
296     val dslResourceOpt = resourcesMap.findResource(theResource)
297     dslResourceOpt match {
298       // We have a resource (and thus a cost policy)
299       case Some(dslResource) ⇒
300         val costPolicy = dslResource.costPolicy
301         clog.debug("Cost policy %s for %s", costPolicy, dslResource)
302         val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
303         if(!isBillable) {
304           // The resource event is not billable
305           clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
306         } else {
307           // The resource event is billable
308           // Find the previous event.
309           // This is (potentially) needed to calculate new credit amount and new resource instance amount
310           val previousResourceEventM = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
311           clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
312
313           val havePreviousResourceEvent = previousResourceEventM.isJust
314           val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
315           if(needPreviousResourceEvent && !havePreviousResourceEvent) {
316             // This must be the first resource event of its kind, ever.
317             // TODO: We should normally check the DB to verify the claim (?)
318             clog.info("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
319             userStateWorker.updateIgnored(currentResourceEvent)
320           } else {
321             val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
322             val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
323             val oldCredits = _workingUserState.creditsSnapshot.creditAmount
324
325             // A. Compute new resource instance accumulating amount
326             val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
327
328             clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
329
330             // B. Compute new wallet entries
331             clog.debug("agreementsSnapshot = %s", _workingUserState.agreementsSnapshot)
332             val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
333
334             //              clog.debug("Computing full chargeslots")
335             val fullChargeslotsM = accounting.computeFullChargeslots(
336               previousResourceEventM,
337               currentResourceEvent,
338               oldCredits,
339               oldAmount,
340               newAmount,
341               dslResource,
342               resourcesMap,
343               alltimeAgreements,
344               algorithmCompiler,
345               policyStore,
346               Some(clog)
347             )
348
349             // We have the chargeslots, let's associate them with the current event
350             fullChargeslotsM match {
351               case Just((referenceTimeslot, fullChargeslots)) ⇒
352                 if(fullChargeslots.length == 0) {
353                   // At least one chargeslot is required.
354                   throw new AquariumException("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
355                 }
356                 clog.debugSeq("fullChargeslots", fullChargeslots, 0)
357
358                 // C. Compute new credit amount (based on the charge slots)
359                 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
360                 val newCredits = oldCredits - newCreditsDiff
361
362                 if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
363                   val newWalletEntry = NewWalletEntry(
364                     userStateWorker.userId,
365                     newCreditsDiff,
366                     oldCredits,
367                     newCredits,
368                     TimeHelpers.nowMillis(),
369                     referenceTimeslot,
370                     billingMonthInfo.year,
371                     billingMonthInfo.month,
372                     if(havePreviousResourceEvent)
373                       List(currentResourceEvent, justForSure(previousResourceEventM).get)
374                     else
375                       List(currentResourceEvent),
376                     fullChargeslots,
377                     dslResource,
378                     currentResourceEvent.isSynthetic
379                   )
380                   clog.debug("New %s", newWalletEntry)
381
382                   walletEntriesBuffer += newWalletEntry
383                 } else {
384                   clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
385                 }
386
387                 _workingUserState = _workingUserState.copy(
388                   creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
389                   stateChangeCounter = _workingUserState.stateChangeCounter + 1,
390                   totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
391                 )
392
393               case NoVal ⇒
394                 // At least one chargeslot is required.
395                 throw new AquariumException("No chargeslots computed")
396
397               case failed@Failed(e) ⇒
398                 throw new AquariumException(e, "Error computing chargeslots")
399             }
400           }
401         }
402
403         // After processing, all events billable or not update the previous state
404         userStateWorker.updatePrevious(currentResourceEvent)
405
406         _workingUserState = _workingUserState.copy(
407           latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
408         )
409
410       // We do not have a resource (and thus, no cost policy)
411       case None ⇒
412         // Now, this is a matter of politics: what do we do if no policy was found?
413         clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
414     } // dslResourceOpt match
415
416     clog.end(currentResourceEventDebugInfo)
417
418     _workingUserState
419   }
420
421   def processResourceEvents(resourceEvents: Traversable[ResourceEventModel],
422                             startingUserState: UserState,
423                             userStateWorker: UserStateWorker,
424                             policyStore: PolicyStore,
425                             stateChangeReason: UserStateChangeReason,
426                             billingMonthInfo: BillingMonthInfo,
427                             walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
428                             algorithmCompiler: CostPolicyAlgorithmCompiler,
429                             clogOpt: Option[ContextualLogger] = None): UserState = {
430
431     var _workingUserState = startingUserState
432
433     for(currentResourceEvent <- resourceEvents) {
434
435       _workingUserState = processResourceEvent(
436         _workingUserState,
437         userStateWorker,
438         currentResourceEvent,
439         policyStore,
440         stateChangeReason,
441         billingMonthInfo,
442         walletEntriesBuffer,
443         algorithmCompiler,
444         clogOpt
445       )
446     }
447
448     _workingUserState
449   }
450
451
452   def doFullMonthlyBilling(userId: String,
453                            billingMonthInfo: BillingMonthInfo,
454                            storeProvider: StoreProvider,
455                            currentUserState: UserState,
456                            defaultResourcesMap: DSLResourcesMap,
457                            accounting: Accounting,
458                            algorithmCompiler: CostPolicyAlgorithmCompiler,
459                            calculationReason: UserStateChangeReason = NoSpecificChangeReason,
460                            clogOpt: Option[ContextualLogger] = None): Maybe[UserState] = Maybe {
461
462
463     val clog = ContextualLogger.fromOther(
464       clogOpt,
465       logger,
466       "doFullMonthlyBilling(%s)", billingMonthInfo)
467     clog.begin()
468
469     val clogSome = Some(clog)
470
471     val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
472       userId,
473       billingMonthInfo.previousMonth,
474       storeProvider,
475       currentUserState,
476       defaultResourcesMap,
477       accounting,
478       algorithmCompiler,
479       calculationReason.forPreviousBillingMonth,
480       clogSome
481     )
482
483     if(previousBillingMonthUserStateM.isNoVal) {
484       throw new AquariumException("Could not calculate initial user state for billing %s".format(billingMonthInfo))
485     }
486     if(previousBillingMonthUserStateM.isFailed) {
487       throw failedForSure(previousBillingMonthUserStateM).exception
488     }
489
490     val startingUserState = justForSure(previousBillingMonthUserStateM).get
491
492     val userStateStore = storeProvider.userStateStore
493     val resourceEventStore = storeProvider.resourceEventStore
494     val policyStore = storeProvider.policyStore
495
496     val billingMonthStartMillis = billingMonthInfo.startMillis
497     val billingMonthEndMillis = billingMonthInfo.stopMillis
498
499     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
500     // specified in the parameters.
501     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
502     var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
503
504     val userStateWorker = UserStateWorker.fromUserState(_workingUserState, accounting, defaultResourcesMap)
505
506     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
507
508     // First, find and process the actual resource events from DB
509     val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
510       userId,
511       billingMonthStartMillis,
512       billingMonthEndMillis)
513
514     val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
515
516     _workingUserState = processResourceEvents(
517       allResourceEventsForMonth,
518       _workingUserState,
519       userStateWorker,
520       policyStore,
521       calculationReason,
522       billingMonthInfo,
523       newWalletEntries,
524       algorithmCompiler,
525       clogSome
526     )
527
528     // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
529     // ... in order to generate an implicit ON later
530     val (specialEvents, theirImplicitEnds) = userStateWorker.
531       findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
532     if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
533       clog.debug("")
534       clog.debug("Process implicitly issued events")
535       clog.debugSeq("specialEvents", specialEvents, 0)
536       clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
537     }
538
539     // Now, the previous and implicitly started must be our base for the following computation, so we create an
540     // appropriate worker
541     val specialUserStateWorker = UserStateWorker(
542       userStateWorker.userId,
543       LatestResourceEventsWorker.fromList(specialEvents),
544       ImplicitlyIssuedResourceEventsWorker.Empty,
545       IgnoredFirstResourceEventsWorker.Empty,
546       userStateWorker.accounting,
547       userStateWorker.resourcesMap
548     )
549
550     _workingUserState = processResourceEvents(
551       theirImplicitEnds,
552       _workingUserState,
553       specialUserStateWorker,
554       policyStore,
555       calculationReason,
556       billingMonthInfo,
557       newWalletEntries,
558       algorithmCompiler,
559       clogSome
560     )
561
562     val lastUpdateTime = TimeHelpers.nowMillis()
563
564     _workingUserState = _workingUserState.copy(
565       implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
566       latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
567       stateChangeCounter = _workingUserState.stateChangeCounter + 1,
568       parentUserStateId = startingUserState.idOpt,
569       newWalletEntries = newWalletEntries.toList
570     )
571
572     clog.debug("calculationReason = %s", calculationReason)
573
574     if(calculationReason.shouldStoreUserState) {
575       val storedUserStateM = userStateStore.insertUserState2(_workingUserState)
576       storedUserStateM match {
577         case Just(storedUserState) ⇒
578           clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState)
579           _workingUserState = storedUserState
580         case NoVal ⇒
581           clog.warn("Could not store %s", _workingUserState)
582         case failed @ Failed(e) ⇒
583           clog.error(e, "Could not store %s", _workingUserState)
584       }
585     }
586
587     clog.debug("RETURN %s", _workingUserState)
588     clog.end()
589     _workingUserState
590   }
591 }
592
593 /**
594  * A helper object holding intermediate state/results during resource event processing.
595  *
596  * @param previousResourceEvents
597  *          This is a collection of all the latest resource events.
598  *          We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
599  *          ones. Will be updated on processing the next resource event.
600  *
601  * @param implicitlyIssuedStartEvents
602  *          The implicitly issued resource events at the beginning of the billing period.
603  *
604  * @param ignoredFirstResourceEvents
605  *          The resource events that were first (and unused) of their kind.
606  *
607  * @author Christos KK Loverdos <loverdos@gmail.com>
608  */
609 case class UserStateWorker(userId: String,
610                            previousResourceEvents: LatestResourceEventsWorker,
611                            implicitlyIssuedStartEvents: ImplicitlyIssuedResourceEventsWorker,
612                            ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker,
613                            accounting: Accounting,
614                            resourcesMap: DSLResourcesMap) {
615
616   /**
617    * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource
618    * events and b) the explicit previous resource events. If the event is found, it is removed from the
619    * respective source.
620    *
621    * If the event is not found, then this must be for a new resource instance.
622    * (and probably then some `zero` resource event must be implied as the previous one)
623    *
624    * @param resource
625    * @param instanceId
626    * @return
627    */
628   def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = {
629     // implicitly issued events are checked first
630     implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
631       case just @ Just(_) ⇒
632         just
633       case NoVal ⇒
634         // explicit previous resource events are checked second
635         previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
636           case just @ Just(_) ⇒
637             just
638           case noValOrFailed ⇒
639             noValOrFailed
640         }
641       case failed ⇒
642         failed
643     }
644   }
645
646   def updateIgnored(resourceEvent: ResourceEventModel): Unit = {
647     ignoredFirstResourceEvents.updateResourceEvent(resourceEvent)
648   }
649
650   def updatePrevious(resourceEvent: ResourceEventModel): Unit = {
651     previousResourceEvents.updateResourceEvent(resourceEvent)
652   }
653
654   def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEventModel ⇒ String): Unit = {
655     if(previousResourceEvents.size > 0) {
656       val map = previousResourceEvents.latestEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
657       clog.debugMap("previousResourceEvents", map, 0)
658     }
659     if(implicitlyIssuedStartEvents.size > 0) {
660       val map = implicitlyIssuedStartEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
661       clog.debugMap("implicitlyTerminatedResourceEvents", map, 0)
662     }
663     if(ignoredFirstResourceEvents.size > 0) {
664       val map = ignoredFirstResourceEvents.ignoredFirstEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
665       clog.debugMap("ignoredFirstResourceEvents", map, 0)
666     }
667   }
668
669 //  private[this]
670 //  def allPreviousAndAllImplicitlyStarted: List[ResourceEvent] = {
671 //    val buffer: FullMutableResourceTypeMap = scala.collection.mutable.Map[FullResourceType, ResourceEvent]()
672 //
673 //    buffer ++= implicitlyIssuedStartEvents.implicitlyIssuedEventsMap
674 //    buffer ++= previousResourceEvents.latestEventsMap
675 //
676 //    buffer.valuesIterator.toList
677 //  }
678
679   /**
680    * Find those events from `implicitlyIssuedStartEvents` and `previousResourceEvents` that will generate implicit
681    * end events along with those implicitly issued events. Before returning, remove the events that generated the
682    * implicit ends from the internal state of this instance.
683    *
684    * @see [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
685    */
686   def findAndRemoveGeneratorsOfImplicitEndEvents(newOccuredMillis: Long
687                                                 ): (List[ResourceEventModel], List[ResourceEventModel]) = {
688     val buffer = mutable.ListBuffer[(ResourceEventModel, ResourceEventModel)]()
689     val checkSet = mutable.Set[ResourceEventModel]()
690
691     def doItFor(map: ResourceEventModel.FullMutableResourceTypeMap): Unit = {
692       val resourceEvents = map.valuesIterator
693       for {
694         resourceEvent <- resourceEvents
695         dslResource   <- resourcesMap.findResource(resourceEvent.safeResource)
696         costPolicy    =  dslResource.costPolicy
697       } {
698         if(costPolicy.supportsImplicitEvents) {
699           if(costPolicy.mustConstructImplicitEndEventFor(resourceEvent)) {
700             val implicitEnd = costPolicy.constructImplicitEndEventFor(resourceEvent, newOccuredMillis)
701
702             if(!checkSet.contains(resourceEvent)) {
703               checkSet.add(resourceEvent)
704               buffer append ((resourceEvent, implicitEnd))
705             }
706
707             // remove it anyway
708             map.remove((resourceEvent.safeResource, resourceEvent.safeInstanceId))
709           }
710         }
711       }
712     }
713
714     doItFor(previousResourceEvents.latestEventsMap)                // we give priority for previous
715     doItFor(implicitlyIssuedStartEvents.implicitlyIssuedEventsMap) // ... over implicitly issued...
716
717     (buffer.view.map(_._1).toList, buffer.view.map(_._2).toList)
718   }
719 }
720
721 object UserStateWorker {
722   def fromUserState(userState: UserState, accounting: Accounting, resourcesMap: DSLResourcesMap): UserStateWorker = {
723     UserStateWorker(
724       userState.userID,
725       userState.latestResourceEventsSnapshot.toMutableWorker,
726       userState.implicitlyIssuedSnapshot.toMutableWorker,
727       IgnoredFirstResourceEventsWorker.Empty,
728       accounting,
729       resourcesMap
730     )
731   }
732 }