+ /* Convert astakos message for adding credits
+ to a regular RESOURCE message */
+ def onHandleAddCreditsEvent(imEvent : IMEventModel) = {
+ val credits = imEvent.details(IMEventModel.DetailsNames.credits).toInt.toDouble
+ val event = new StdResourceEvent(
+ imEvent.id,
+ imEvent.occurredMillis,
+ imEvent.receivedMillis,
+ imEvent.userID,
+ imEvent.clientID,
+ imEvent.eventType,
+ imEvent.eventType,
+ credits,
+ imEvent.eventVersion,
+ imEvent.details
+ )
+ //Console.err.println("Event: " + event)
+ //Console.err.println("Total credits before: " + _workingUserState.totalCredits)
+ onProcessResourceEvent(new ProcessResourceEvent(event))
+ //Console.err.println("Total credits after: " + _workingUserState.totalCredits)
+ //Console.err.println("OK.")
+ }
+
+ def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
+ val rcEvent = event.rcEvent
+
+ if(!shouldProcessResourceEvents) {
+ // This means the user has not been created (at least, as far as Aquarium is concerned).
+ // So, we do not process any resource event
+ DEBUG("Not processing %s", rcEvent.toJsonString)
+ logSeparator()
+
+ return
+ }
+
+ // Since the latest resource event per resource is recorded in the user state,
+ // we do not need to query the store. Just query the in-memory state.
+ // Note: This is a similar situation with the first IMEvent received right after the user
+ // actor is created.
+ if(this._latestResourceEventID == rcEvent.id) {
+ INFO("Ignoring first %s", rcEvent.toDebugString)
+ logSeparator()
+
+ return
+ }
+
+ val now = TimeHelpers.nowMillis()
+ val currentResourcesMap = aquarium.currentResourceTypesMap
+ val chargingReason = RealtimeChargingReason(None, now)
+
+ val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
+ val nowYear = nowBillingMonthInfo.year
+ val nowMonth = nowBillingMonthInfo.month
+
+ val eventOccurredMillis = rcEvent.occurredMillis
+ val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
+ val eventYear = eventBillingMonthInfo.year
+ val eventMonth = eventBillingMonthInfo.month
+
+ def computeBatch(): Unit = {
+ DEBUG("Going for out of sync charging")
+ this._workingUserState = chargingService.replayMonthChargingUpTo(
+ nowBillingMonthInfo,
+ // Take into account that the event may be out-of-sync.
+ // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
+ now max eventOccurredMillis,
+ this._userStateBootstrap,
+ currentResourcesMap,
+ chargingReason,
+ stdUserStateStoreFunc
+ )
+
+ updateLatestResourceEventIDFrom(rcEvent)
+ }