- private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = {
- if(checkForOlderEvents) {
- DEBUG("Checking for events older than %s" format resourceEvent)
- processOlderResourceEvents(resourceEvent)
- }
-
- justProcessTheResourceEvent(resourceEvent, "ACTUAL")
- }
-
-
- /**
- * Given an "onoff" event, we try to locate all unprocessed resource events that precede this one.
- */
- def findOlderResourceEventsForOnOff(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
- Nil
- }
-
- def findOlderResourceEventsForOther(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
- Nil
- }
-
- /**
- * Find resource events that precede the given one and are unprocessed.
- */
- private[this] def findOlderResourceEvents(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
- if(rcEvent.isOnOffEvent(policy)) {
- findOlderResourceEventsForOnOff(rcEvent, policy)
- } else {
- findOlderResourceEventsForOther(rcEvent, policy)
- }
- }
-
- /**
- * Find and process older resource events.
- *
- * Older resource events are found based on the latest credit calculation, that is the latest
- * wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries
- * have been calculated for these resource events and start from there.
- */
- private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = {
- assert(_userId == resourceEvent.userId)
- val rceId = resourceEvent.id
- val userId = resourceEvent.userId
- val resourceEventStore = _configurator.resourceEventStore
- val walletEntriesStore = _configurator.walletStore
-
- // 1. Find latest wallet entry
- val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId)
- latestWalletEntriesM match {
- case Just(latestWalletEntries) ⇒
- // The time on which we base the selection of the older events
- val selectionTime = latestWalletEntries.head.occurredMillis
-
- // 2. Now find all resource events past the time of the latest wallet entry.
- // These events have not been processed, except probably those ones
- // that have the same `occurredMillis` with `selectionTime`
- val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime)
-
- // 3. Filter out those resource events for which no wallet entry has actually been
- // produced.
- val rcEventsToProcess = for {
- oldRCEvent <- oldRCEvents
- oldRCEventId = oldRCEvent.id
- latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId)
- } yield {
- oldRCEvent
- }
-
- DEBUG("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent))
-
- for {
- rcEventToProcess <- rcEventsToProcess
- } {
- justProcessTheResourceEvent(rcEventToProcess, "OLDER")
- }
- case NoVal ⇒
- DEBUG("No events to process older than %s".format(resourceEvent))
- case Failed(e, m) ⇒
- ERROR("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage))
- }
- }
-
- private[this] def _storeWalletEntries(walletEntries: List[WalletEntry], allowNonFinalized: Boolean): Unit = {
- val walletEntriesStore = _configurator.walletStore
- for(walletEntry <- walletEntries) {
- val allow = walletEntry.finalized || allowNonFinalized
- if(allow) {
- walletEntriesStore.storeWalletEntry(walletEntry)
- }
- }
- }
-
- private[this] def _calcNewCreditSum(walletEntries: List[WalletEntry]): Double = {
- val newCredits = for {
- walletEntry <- walletEntries if(walletEntry.finalized)
- } yield walletEntry.value.toDouble
-
- newCredits.sum
- }
-
- /**
- * Process the resource event as if nothing else matters. Just do it.
- */
- private[this] def justProcessTheResourceEvent(ev: ResourceEvent, logLabel: String): Unit = {
- val start = System.currentTimeMillis
- DEBUG("Processing [%s] %s".format(logLabel, ev))
-
- // Initially, the user state (regarding resources) is empty.
- // So we have to compensate for both a totally empty resource state
- // and the update with new values.
-
- // 1. Find the resource definition
- val policy = Policy.policy
- policy.findResource(ev.resource) match {
- case Some(resource) ⇒
- // 2. Get the instance id and value for the resource
- val instanceIdM = resource match {
- // 2.1 If it is complex, from the details map, get the field which holds the instanceId
- case DSLComplexResource(name, unit, costPolicy, descriminatorField) ⇒
- ev.details.get(descriminatorField) match {
- case Some(instanceId) ⇒
- Just(instanceId)
- case None ⇒
- // We should have some value under this key here....
- Failed(throw new AccountingException("")) //TODO: See what to do here
- }
- // 2.2 If it is simple, ...
- case DSLSimpleResource(name, unit, costPolicy) ⇒
- // ... by convention, the instanceId of a simple resource is just "1"
- // @see [[gr.grnet.aquarium.user.OwnedResourcesSnapshot]]
- Just("1")
- }
-
- // 3. Did we get a valid instanceId?
- instanceIdM match {
- // 3.1 Yes, time to get/update the current state
- case Just(instanceId) ⇒
- val oldOwnedResources = _userState.ownedResources
-
- // A. First state diff: the modified resource value
- val StateChangeMillis = TimeHelpers.nowMillis
- val (newOwnedResources, oldRCInstanceOpt, newRCInstance) = oldOwnedResources.
- addOrUpdateResourceSnapshot(resource.name, instanceId, ev.value, ev.occurredMillis)
- val previousRCUpdateTime = oldRCInstanceOpt.map(_.snapshotTime).getOrElse(newRCInstance.snapshotTime)
-
- // Calculate the wallet entries generated from this resource event
- _userState.maybeDSLAgreement match {
- case Just(agreement) ⇒
- val walletEntriesM = chargeEvent(ev, agreement, ev.value,
- new Date(previousRCUpdateTime),
- findRelatedEntries(resource, ev.getInstanceId(policy)))
- walletEntriesM match {
- case Just(walletEntries) ⇒
- _storeWalletEntries(walletEntries, true)
-
- // B. Second state diff: the new credits
- val newCreditsSum = _calcNewCreditSum(walletEntries)
- val oldCredits = _userState.safeCredits.data
- val newCredits = CreditSnapshot(oldCredits + newCreditsSum, StateChangeMillis)
-
- // Finally, the userState change
- DEBUG("Credits = %s".format(this._userId, newCredits))
- DEBUG("Resources = %s".format(this._userId, newOwnedResources))
- this._userState = this._userState.copy(
- credits = newCredits,
- ownedResources = newOwnedResources
- )
- case NoVal ⇒
- DEBUG("No wallet entries generated for %s".format(ev))
- case failed @ Failed(e, m) ⇒
- failed
- }
-
- case NoVal ⇒
- Failed(new UserDataSnapshotException("No agreement snapshot found for user %s".format(this._userId)))
- case failed @ Failed(e, m) ⇒
- failed
- }
- // 3.2 No, no luck, this is an error
- case NoVal ⇒
- Failed(new UserDataSnapshotException("No instanceId for resource %s of user %s".format(resource, this._userId)))
- case failed @ Failed(e, m) ⇒
- failed
- }
- // No resource definition found, this is an error
- case None ⇒ // Policy.policy.findResource(ev.resource)
- Failed(new UserDataSnapshotException("No resource %s found for user %s".format(ev.resource, this._userId)))
- }
-
- DEBUG("Finished %s time: %d ms".format(ev.id, System.currentTimeMillis - start))
- }
-