2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.user
39 import scala.collection.mutable
40 import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
41 import gr.grnet.aquarium.util.{ContextualLogger, Loggable, justForSure, failedForSure}
42 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
43 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap}
44 import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
45 import gr.grnet.aquarium.logic.accounting.Accounting
46 import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
47 import gr.grnet.aquarium.event.{NewWalletEntry}
48 import gr.grnet.aquarium.event.resource.ResourceEventModel
49 import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent}
50 import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion.User
51 import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
55 * @author Christos KK Loverdos <loverdos@gmail.com>
57 class UserStateComputations extends Loggable {
58 def createInitialUserState(imEvent: IMEventModel, credits: Double, agreementName: String) = {
59 if(!imEvent.isCreateUser) {
60 throw new AquariumInternalError(
61 "Got '%s' instead of '%s'".format(imEvent.eventType, IMEventModel.EventTypeNames.create))
64 val userID = imEvent.userID
65 val userCreationMillis = imEvent.occurredMillis
66 val now = TimeHelpers.nowMillis()
75 ImplicitlyIssuedResourceEventsSnapshot(List(), now),
78 LatestResourceEventsSnapshot(List(), now),
81 IMStateSnapshot(imEvent, now),
82 CreditSnapshot(credits, now),
83 AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
84 OwnedResourcesSnapshot(Nil, now),
90 def createInitialUserState(userID: String,
91 userCreationMillis: Long,
94 roleNames: List[String] = List(),
95 agreementName: String = DSLAgreement.DefaultAgreementName) = {
96 val now = userCreationMillis
105 ImplicitlyIssuedResourceEventsSnapshot(List(), now),
108 LatestResourceEventsSnapshot(List(), now),
116 isActive, roleNames.headOption.getOrElse("default"),
118 IMEventModel.EventTypeNames.create, Map()),
121 CreditSnapshot(credits, now),
122 AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
123 OwnedResourcesSnapshot(Nil, now),
125 InitialUserStateSetup
129 def createInitialUserStateFrom(us: UserState): UserState = {
130 createInitialUserState(
131 us.imStateSnapshot.imEvent,
132 us.creditsSnapshot.creditAmount,
133 us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last)
136 def findUserStateAtEndOfBillingMonth(userId: String,
137 billingMonthInfo: BillingMonthInfo,
138 storeProvider: StoreProvider,
139 currentUserState: UserState,
140 defaultResourcesMap: DSLResourcesMap,
141 accounting: Accounting,
142 algorithmCompiler: CostPolicyAlgorithmCompiler,
143 calculationReason: UserStateChangeReason,
144 clogOpt: Option[ContextualLogger] = None): Maybe[UserState] = {
146 val clog = ContextualLogger.fromOther(
149 "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
152 def doCompute: Maybe[UserState] = {
153 doFullMonthlyBilling(
165 val userStateStore = storeProvider.userStateStore
166 val resourceEventStore = storeProvider.resourceEventStore
168 val userCreationMillis = currentUserState.userCreationMillis
169 val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
170 val billingMonthStartMillis = billingMonthInfo.startMillis
171 val billingMonthStopMillis = billingMonthInfo.stopMillis
173 if(billingMonthStopMillis < userCreationMillis) {
174 // If the user did not exist for this billing month, piece of cake
175 clog.debug("User did not exist before %s", userCreationDateCalc)
177 // NOTE: Reason here will be: InitialUserStateSetup$
178 val initialUserState0 = createInitialUserStateFrom(currentUserState)
179 val initialUserStateM = userStateStore.insertUserState2(initialUserState0)
181 clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM))
186 // Ask DB cache for the latest known user state for this billing period
187 val latestUserStateM = Maybe { userStateStore.findLatestUserStateForEndOfBillingMonth(
189 billingMonthInfo.year,
190 billingMonthInfo.month) match {
192 case Some(latestUserState) ⇒
198 latestUserStateM match {
200 // Not found, must compute
201 clog.debug("No user state found from cache, will have to (re)compute")
202 val result = doCompute
206 case failed @ Failed(e) ⇒
207 clog.warn("Failure while quering cache for user state: %s", failed)
211 case Just(latestUserState) ⇒
212 // Found a "latest" user state but need to see if it is indeed the true and one latest.
213 // For this reason, we must count the events again.
214 val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
215 val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
217 billingMonthStartMillis,
218 billingMonthStopMillis)
220 actualOOSEventsCounterM match {
222 val errMsg = "No counter computed for out of sync events. Should at least be zero."
224 val result = Failed(new AquariumException(errMsg))
228 case failed @ Failed(_) ⇒
229 clog.warn("Failure while querying for out of sync events: %s", failed)
233 case Just(actualOOSEventsCounter) ⇒
234 val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
238 // NOTE: Keep the caller's calculation reason
239 Just(latestUserState.copyForChangeReason(calculationReason))
241 // We had more, so must recompute
244 "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
245 val result = doCompute
251 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
253 val result = Failed(new AquariumException(errMsg))
263 def rcDebugInfo(rcEvent: ResourceEventModel) = {
264 rcEvent.toDebugString(false)
268 def processResourceEvent(startingUserState: UserState,
269 userStateWorker: UserStateWorker,
270 currentResourceEvent: ResourceEventModel,
271 policyStore: PolicyStore,
272 stateChangeReason: UserStateChangeReason,
273 billingMonthInfo: BillingMonthInfo,
274 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
275 algorithmCompiler: CostPolicyAlgorithmCompiler,
276 clogOpt: Option[ContextualLogger] = None): UserState = {
278 val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
280 var _workingUserState = startingUserState
282 val theResource = currentResourceEvent.safeResource
283 val theInstanceId = currentResourceEvent.safeInstanceId
284 val theValue = currentResourceEvent.value
286 val accounting = userStateWorker.accounting
287 val resourcesMap = userStateWorker.resourcesMap
289 val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
290 clog.begin(currentResourceEventDebugInfo)
292 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
294 // Ignore the event if it is not billable (but still record it in the "previous" stuff).
295 // But to make this decision, first we need the resource definition (and its cost policy).
296 val dslResourceOpt = resourcesMap.findResource(theResource)
297 dslResourceOpt match {
298 // We have a resource (and thus a cost policy)
299 case Some(dslResource) ⇒
300 val costPolicy = dslResource.costPolicy
301 clog.debug("Cost policy %s for %s", costPolicy, dslResource)
302 val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
304 // The resource event is not billable
305 clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
307 // The resource event is billable
308 // Find the previous event.
309 // This is (potentially) needed to calculate new credit amount and new resource instance amount
310 val previousResourceEventM = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
311 clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
313 val havePreviousResourceEvent = previousResourceEventM.isJust
314 val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
315 if(needPreviousResourceEvent && !havePreviousResourceEvent) {
316 // This must be the first resource event of its kind, ever.
317 // TODO: We should normally check the DB to verify the claim (?)
318 clog.info("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
319 userStateWorker.updateIgnored(currentResourceEvent)
321 val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
322 val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
323 val oldCredits = _workingUserState.creditsSnapshot.creditAmount
325 // A. Compute new resource instance accumulating amount
326 val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
328 clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
330 // B. Compute new wallet entries
331 clog.debug("agreementsSnapshot = %s", _workingUserState.agreementsSnapshot)
332 val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
334 // clog.debug("Computing full chargeslots")
335 val fullChargeslotsM = accounting.computeFullChargeslots(
336 previousResourceEventM,
337 currentResourceEvent,
349 // We have the chargeslots, let's associate them with the current event
350 fullChargeslotsM match {
351 case Just((referenceTimeslot, fullChargeslots)) ⇒
352 if(fullChargeslots.length == 0) {
353 // At least one chargeslot is required.
354 throw new AquariumException("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
356 clog.debugSeq("fullChargeslots", fullChargeslots, 0)
358 // C. Compute new credit amount (based on the charge slots)
359 val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
360 val newCredits = oldCredits - newCreditsDiff
362 if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
363 val newWalletEntry = NewWalletEntry(
364 userStateWorker.userId,
368 TimeHelpers.nowMillis(),
370 billingMonthInfo.year,
371 billingMonthInfo.month,
372 if(havePreviousResourceEvent)
373 List(currentResourceEvent, justForSure(previousResourceEventM).get)
375 List(currentResourceEvent),
378 currentResourceEvent.isSynthetic
380 clog.debug("New %s", newWalletEntry)
382 walletEntriesBuffer += newWalletEntry
384 clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
387 _workingUserState = _workingUserState.copy(
388 creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
389 stateChangeCounter = _workingUserState.stateChangeCounter + 1,
390 totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
394 // At least one chargeslot is required.
395 throw new AquariumException("No chargeslots computed")
397 case failed@Failed(e) ⇒
398 throw new AquariumException(e, "Error computing chargeslots")
403 // After processing, all events billable or not update the previous state
404 userStateWorker.updatePrevious(currentResourceEvent)
406 _workingUserState = _workingUserState.copy(
407 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
410 // We do not have a resource (and thus, no cost policy)
412 // Now, this is a matter of politics: what do we do if no policy was found?
413 clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
414 } // dslResourceOpt match
416 clog.end(currentResourceEventDebugInfo)
421 def processResourceEvents(resourceEvents: Traversable[ResourceEventModel],
422 startingUserState: UserState,
423 userStateWorker: UserStateWorker,
424 policyStore: PolicyStore,
425 stateChangeReason: UserStateChangeReason,
426 billingMonthInfo: BillingMonthInfo,
427 walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
428 algorithmCompiler: CostPolicyAlgorithmCompiler,
429 clogOpt: Option[ContextualLogger] = None): UserState = {
431 var _workingUserState = startingUserState
433 for(currentResourceEvent <- resourceEvents) {
435 _workingUserState = processResourceEvent(
438 currentResourceEvent,
452 def doFullMonthlyBilling(userId: String,
453 billingMonthInfo: BillingMonthInfo,
454 storeProvider: StoreProvider,
455 currentUserState: UserState,
456 defaultResourcesMap: DSLResourcesMap,
457 accounting: Accounting,
458 algorithmCompiler: CostPolicyAlgorithmCompiler,
459 calculationReason: UserStateChangeReason = NoSpecificChangeReason,
460 clogOpt: Option[ContextualLogger] = None): Maybe[UserState] = Maybe {
463 val clog = ContextualLogger.fromOther(
466 "doFullMonthlyBilling(%s)", billingMonthInfo)
469 val clogSome = Some(clog)
471 val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
473 billingMonthInfo.previousMonth,
479 calculationReason.forPreviousBillingMonth,
483 if(previousBillingMonthUserStateM.isNoVal) {
484 throw new AquariumException("Could not calculate initial user state for billing %s".format(billingMonthInfo))
486 if(previousBillingMonthUserStateM.isFailed) {
487 throw failedForSure(previousBillingMonthUserStateM).exception
490 val startingUserState = justForSure(previousBillingMonthUserStateM).get
492 val userStateStore = storeProvider.userStateStore
493 val resourceEventStore = storeProvider.resourceEventStore
494 val policyStore = storeProvider.policyStore
496 val billingMonthStartMillis = billingMonthInfo.startMillis
497 val billingMonthEndMillis = billingMonthInfo.stopMillis
499 // Keep the working (current) user state. This will get updated as we proceed with billing for the month
500 // specified in the parameters.
501 // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
502 var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
504 val userStateWorker = UserStateWorker.fromUserState(_workingUserState, accounting, defaultResourcesMap)
506 userStateWorker.debugTheMaps(clog)(rcDebugInfo)
508 // First, find and process the actual resource events from DB
509 val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
511 billingMonthStartMillis,
512 billingMonthEndMillis)
514 val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
516 _workingUserState = processResourceEvents(
517 allResourceEventsForMonth,
528 // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
529 // ... in order to generate an implicit ON later
530 val (specialEvents, theirImplicitEnds) = userStateWorker.
531 findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
532 if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
534 clog.debug("Process implicitly issued events")
535 clog.debugSeq("specialEvents", specialEvents, 0)
536 clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
539 // Now, the previous and implicitly started must be our base for the following computation, so we create an
540 // appropriate worker
541 val specialUserStateWorker = UserStateWorker(
542 userStateWorker.userId,
543 LatestResourceEventsWorker.fromList(specialEvents),
544 ImplicitlyIssuedResourceEventsWorker.Empty,
545 IgnoredFirstResourceEventsWorker.Empty,
546 userStateWorker.accounting,
547 userStateWorker.resourcesMap
550 _workingUserState = processResourceEvents(
553 specialUserStateWorker,
562 val lastUpdateTime = TimeHelpers.nowMillis()
564 _workingUserState = _workingUserState.copy(
565 implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
566 latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
567 stateChangeCounter = _workingUserState.stateChangeCounter + 1,
568 parentUserStateId = startingUserState.idOpt,
569 newWalletEntries = newWalletEntries.toList
572 clog.debug("calculationReason = %s", calculationReason)
574 if(calculationReason.shouldStoreUserState) {
575 val storedUserStateM = userStateStore.insertUserState2(_workingUserState)
576 storedUserStateM match {
577 case Just(storedUserState) ⇒
578 clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState)
579 _workingUserState = storedUserState
581 clog.warn("Could not store %s", _workingUserState)
582 case failed @ Failed(e) ⇒
583 clog.error(e, "Could not store %s", _workingUserState)
587 clog.debug("RETURN %s", _workingUserState)
594 * A helper object holding intermediate state/results during resource event processing.
596 * @param previousResourceEvents
597 * This is a collection of all the latest resource events.
598 * We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
599 * ones. Will be updated on processing the next resource event.
601 * @param implicitlyIssuedStartEvents
602 * The implicitly issued resource events at the beginning of the billing period.
604 * @param ignoredFirstResourceEvents
605 * The resource events that were first (and unused) of their kind.
607 * @author Christos KK Loverdos <loverdos@gmail.com>
609 case class UserStateWorker(userId: String,
610 previousResourceEvents: LatestResourceEventsWorker,
611 implicitlyIssuedStartEvents: ImplicitlyIssuedResourceEventsWorker,
612 ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker,
613 accounting: Accounting,
614 resourcesMap: DSLResourcesMap) {
617 * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource
618 * events and b) the explicit previous resource events. If the event is found, it is removed from the
621 * If the event is not found, then this must be for a new resource instance.
622 * (and probably then some `zero` resource event must be implied as the previous one)
628 def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = {
629 // implicitly issued events are checked first
630 implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
631 case just @ Just(_) ⇒
634 // explicit previous resource events are checked second
635 previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
636 case just @ Just(_) ⇒
646 def updateIgnored(resourceEvent: ResourceEventModel): Unit = {
647 ignoredFirstResourceEvents.updateResourceEvent(resourceEvent)
650 def updatePrevious(resourceEvent: ResourceEventModel): Unit = {
651 previousResourceEvents.updateResourceEvent(resourceEvent)
654 def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEventModel ⇒ String): Unit = {
655 if(previousResourceEvents.size > 0) {
656 val map = previousResourceEvents.latestEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
657 clog.debugMap("previousResourceEvents", map, 0)
659 if(implicitlyIssuedStartEvents.size > 0) {
660 val map = implicitlyIssuedStartEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
661 clog.debugMap("implicitlyTerminatedResourceEvents", map, 0)
663 if(ignoredFirstResourceEvents.size > 0) {
664 val map = ignoredFirstResourceEvents.ignoredFirstEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
665 clog.debugMap("ignoredFirstResourceEvents", map, 0)
670 // def allPreviousAndAllImplicitlyStarted: List[ResourceEvent] = {
671 // val buffer: FullMutableResourceTypeMap = scala.collection.mutable.Map[FullResourceType, ResourceEvent]()
673 // buffer ++= implicitlyIssuedStartEvents.implicitlyIssuedEventsMap
674 // buffer ++= previousResourceEvents.latestEventsMap
676 // buffer.valuesIterator.toList
680 * Find those events from `implicitlyIssuedStartEvents` and `previousResourceEvents` that will generate implicit
681 * end events along with those implicitly issued events. Before returning, remove the events that generated the
682 * implicit ends from the internal state of this instance.
684 * @see [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
686 def findAndRemoveGeneratorsOfImplicitEndEvents(newOccuredMillis: Long
687 ): (List[ResourceEventModel], List[ResourceEventModel]) = {
688 val buffer = mutable.ListBuffer[(ResourceEventModel, ResourceEventModel)]()
689 val checkSet = mutable.Set[ResourceEventModel]()
691 def doItFor(map: ResourceEventModel.FullMutableResourceTypeMap): Unit = {
692 val resourceEvents = map.valuesIterator
694 resourceEvent <- resourceEvents
695 dslResource <- resourcesMap.findResource(resourceEvent.safeResource)
696 costPolicy = dslResource.costPolicy
698 if(costPolicy.supportsImplicitEvents) {
699 if(costPolicy.mustConstructImplicitEndEventFor(resourceEvent)) {
700 val implicitEnd = costPolicy.constructImplicitEndEventFor(resourceEvent, newOccuredMillis)
702 if(!checkSet.contains(resourceEvent)) {
703 checkSet.add(resourceEvent)
704 buffer append ((resourceEvent, implicitEnd))
708 map.remove((resourceEvent.safeResource, resourceEvent.safeInstanceId))
714 doItFor(previousResourceEvents.latestEventsMap) // we give priority for previous
715 doItFor(implicitlyIssuedStartEvents.implicitlyIssuedEventsMap) // ... over implicitly issued...
717 (buffer.view.map(_._1).toList, buffer.view.map(_._2).toList)
721 object UserStateWorker {
722 def fromUserState(userState: UserState, accounting: Accounting, resourcesMap: DSLResourcesMap): UserStateWorker = {
725 userState.latestResourceEventsSnapshot.toMutableWorker,
726 userState.implicitlyIssuedSnapshot.toMutableWorker,
727 IgnoredFirstResourceEventsWorker.Empty,