private[this] def _configurator: Configurator = Configurator.MasterConfigurator
- private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = {
- if(checkForOlderEvents) {
- DEBUG("Checking for events older than %s" format resourceEvent)
- processOlderResourceEvents(resourceEvent)
- }
-
- justProcessTheResourceEvent(resourceEvent, "ACTUAL")
- }
-
-
- /**
- * Given an "onoff" event, we try to locate all unprocessed resource events that precede this one.
- */
- def findOlderResourceEventsForOnOff(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
- Nil
- }
-
- def findOlderResourceEventsForOther(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
- Nil
- }
-
- /**
- * Find resource events that precede the given one and are unprocessed.
- */
- private[this] def findOlderResourceEvents(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
- if(rcEvent.isOnOffEvent(policy)) {
- findOlderResourceEventsForOnOff(rcEvent, policy)
- } else {
- findOlderResourceEventsForOther(rcEvent, policy)
- }
- }
-
- /**
- * Find and process older resource events.
- *
- * Older resource events are found based on the latest credit calculation, that is the latest
- * wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries
- * have been calculated for these resource events and start from there.
- */
- private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = {
- assert(_userId == resourceEvent.userId)
- val rceId = resourceEvent.id
- val userId = resourceEvent.userId
- val resourceEventStore = _configurator.resourceEventStore
- val walletEntriesStore = _configurator.walletStore
-
- // 1. Find latest wallet entry
- val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId)
- latestWalletEntriesM match {
- case Just(latestWalletEntries) ⇒
- // The time on which we base the selection of the older events
- val selectionTime = latestWalletEntries.head.occurredMillis
-
- // 2. Now find all resource events past the time of the latest wallet entry.
- // These events have not been processed, except probably those ones
- // that have the same `occurredMillis` with `selectionTime`
- val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime)
-
- // 3. Filter out those resource events for which no wallet entry has actually been
- // produced.
- val rcEventsToProcess = for {
- oldRCEvent <- oldRCEvents
- oldRCEventId = oldRCEvent.id
- latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId)
- } yield {
- oldRCEvent
- }
-
- DEBUG("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent))
-
- for {
- rcEventToProcess <- rcEventsToProcess
- } {
- justProcessTheResourceEvent(rcEventToProcess, "OLDER")
- }
- case NoVal ⇒
- DEBUG("No events to process older than %s".format(resourceEvent))
- case Failed(e, m) ⇒
- ERROR("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage))
- }
- }
-
- private[this] def _storeWalletEntries(walletEntries: List[WalletEntry], allowNonFinalized: Boolean): Unit = {
- val walletEntriesStore = _configurator.walletStore
- for(walletEntry <- walletEntries) {
- val allow = walletEntry.finalized || allowNonFinalized
- if(allow) {
- walletEntriesStore.storeWalletEntry(walletEntry)
- }
- }
- }
-
- private[this] def _calcNewCreditSum(walletEntries: List[WalletEntry]): Double = {
- val newCredits = for {
- walletEntry <- walletEntries if(walletEntry.finalized)
- } yield walletEntry.value.toDouble
-
- newCredits.sum
- }
-
- /**
- * Process the resource event as if nothing else matters. Just do it.
- */
- private[this] def justProcessTheResourceEvent(ev: ResourceEvent, logLabel: String): Unit = {
- val start = System.currentTimeMillis
- DEBUG("Processing [%s] %s".format(logLabel, ev))
-
- // Initially, the user state (regarding resources) is empty.
- // So we have to compensate for both a totally empty resource state
- // and the update with new values.
-
- // 1. Find the resource definition
- val policy = Policy.policy
- policy.findResource(ev.resource) match {
- case Some(resource) ⇒
- // 2. Get the instance id and value for the resource
- val instanceIdM = resource match {
- // 2.1 If it is complex, from the details map, get the field which holds the instanceId
- case DSLComplexResource(name, unit, costPolicy, descriminatorField) ⇒
- ev.details.get(descriminatorField) match {
- case Some(instanceId) ⇒
- Just(instanceId)
- case None ⇒
- // We should have some value under this key here....
- Failed(throw new AccountingException("")) //TODO: See what to do here
- }
- // 2.2 If it is simple, ...
- case DSLSimpleResource(name, unit, costPolicy) ⇒
- // ... by convention, the instanceId of a simple resource is just "1"
- // @see [[gr.grnet.aquarium.user.OwnedResourcesSnapshot]]
- Just("1")
- }
-
- // 3. Did we get a valid instanceId?
- instanceIdM match {
- // 3.1 Yes, time to get/update the current state
- case Just(instanceId) ⇒
- val oldOwnedResources = _userState.ownedResources
-
- // A. First state diff: the modified resource value
- val StateChangeMillis = TimeHelpers.nowMillis
- val (newOwnedResources, oldRCInstanceOpt, newRCInstance) = oldOwnedResources.
- addOrUpdateResourceSnapshot(resource.name, instanceId, ev.value, ev.occurredMillis)
- val previousRCUpdateTime = oldRCInstanceOpt.map(_.snapshotTime).getOrElse(newRCInstance.snapshotTime)
-
- // Calculate the wallet entries generated from this resource event
- _userState.maybeDSLAgreement match {
- case Just(agreement) ⇒
- val walletEntriesM = chargeEvent(ev, agreement, ev.value,
- new Date(previousRCUpdateTime),
- findRelatedEntries(resource, ev.getInstanceId(policy)))
- walletEntriesM match {
- case Just(walletEntries) ⇒
- _storeWalletEntries(walletEntries, true)
-
- // B. Second state diff: the new credits
- val newCreditsSum = _calcNewCreditSum(walletEntries)
- val oldCredits = _userState.safeCredits.data
- val newCredits = CreditSnapshot(oldCredits + newCreditsSum, StateChangeMillis)
-
- // Finally, the userState change
- DEBUG("Credits = %s".format(this._userId, newCredits))
- DEBUG("Resources = %s".format(this._userId, newOwnedResources))
- this._userState = this._userState.copy(
- credits = newCredits,
- ownedResources = newOwnedResources
- )
- case NoVal ⇒
- DEBUG("No wallet entries generated for %s".format(ev))
- case failed @ Failed(e, m) ⇒
- failed
- }
-
- case NoVal ⇒
- Failed(new UserDataSnapshotException("No agreement snapshot found for user %s".format(this._userId)))
- case failed @ Failed(e, m) ⇒
- failed
- }
- // 3.2 No, no luck, this is an error
- case NoVal ⇒
- Failed(new UserDataSnapshotException("No instanceId for resource %s of user %s".format(resource, this._userId)))
- case failed @ Failed(e, m) ⇒
- failed
- }
- // No resource definition found, this is an error
- case None ⇒ // Policy.policy.findResource(ev.resource)
- Failed(new UserDataSnapshotException("No resource %s found for user %s".format(ev.resource, this._userId)))
- }
-
- DEBUG("Finished %s time: %d ms".format(ev.id, System.currentTimeMillis - start))
- }
-
private[this] def processCreateUser(event: UserEvent): Unit = {
val userId = event.userId
DEBUG("Creating user from state %s", event)
}
/**
- * Try to load from the DB the latest known info (snapshot data) for the given user.
- */
- private[this] def findUserState(userId: String): Maybe[UserState] = {
- val usersDB = _configurator.storeProvider.userStateStore
- usersDB.findUserStateByUserId(userId)
- }
-
- /**
* Tries to makes sure that the internal user state exists.
*
* May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that.
*
*/
private[this] def ensureUserState(): Unit = {
- /*if(null eq this._userState) {
- findUserState(this._userId) match {
- case Just(userState) ⇒
- DEBUG("Loaded user state %s from DB", userState)
- //TODO: May be out of sync with the event store, rebuild it here
- this._userState = userState
- rebuildState(this._userState.oldestSnapshotTime)
- case Failed(e, m) ⇒
- ERROR("While loading user state from DB: [%s][%s] %s", e.getClass.getName, e.getMessage, m)
- case NoVal ⇒
- //TODO: Rebuild actor state here.
- rebuildState(0)
- WARN("Request for unknown (to Aquarium) user")
- }
- }*/
-
if (_userState == null)
rebuildState(0)
else
}
case m @ RequestUserBalance(userId, timestamp) ⇒
- /*if(this._userId != userId) {
- ERROR("Received %s but my userId = %s".format(m, this._userId))
- // TODO: throw an exception here
- } else {
- // This is the big party.
- // Get the user state, if it exists and make sure it is not stale.
- ensureUserState()
-
- // Do we have a user state?
- if(_userState ne null) {
- // Yep, we do. See what there is inside it.
- val credits = _userState.credits
- val creditsTimestamp = credits.snapshotTime
-
- // Check if data is stale
- if(creditsTimestamp + _timestampTheshold > timestamp) {
- // No, it's OK
- self reply UserResponseGetBalance(userId, credits.data)
- } else {
- // Yep, data is stale and must recompute balance
- // FIXME: implement
- ERROR("FIXME: Should have computed a new value for %s".format(credits))
- self reply UserResponseGetBalance(userId, credits.data)
- }
- } else {
- // No user state exists. This is an error.
- val errMsg = "Could not load user state for %s".format(m)
- ERROR(errMsg)
- self reply ResponseUserBalance(userId, 0, Some(errMsg))
- }*/
- if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
- calcWalletEntries()
- self reply UserResponseGetBalance(userId, _userState.credits.data)
- //}
+ if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
+ calcWalletEntries()
+ self reply UserResponseGetBalance(userId, _userState.credits.data)
case m @ UserRequestGetState(userId, timestamp) ⇒
if(this._userId != userId) {