X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/77ea8fd035b427f8a799ed8bdb1502418cea1e33..946e3a9be50e0a04c92afd8acd5304d9dcd9496b:/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index 46f5824..065107d 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -37,20 +37,29 @@ package gr.grnet.aquarium.actor package service package user -import gr.grnet.aquarium.actor._ - -import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent} -import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded} -import gr.grnet.aquarium.util.date.TimeHelpers -import gr.grnet.aquarium.event.model.im.IMEventModel -import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest} -import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf} import gr.grnet.aquarium.AquariumInternalError +import gr.grnet.aquarium.actor.message.GetUserBalanceRequest +import gr.grnet.aquarium.actor.message.GetUserBalanceResponse +import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData +import gr.grnet.aquarium.actor.message.GetUserBillRequest +import gr.grnet.aquarium.actor.message.GetUserBillResponse +import gr.grnet.aquarium.actor.message.GetUserBillResponseData +import gr.grnet.aquarium.actor.message.GetUserStateRequest +import gr.grnet.aquarium.actor.message.GetUserStateResponse +import gr.grnet.aquarium.actor.message.GetUserWalletRequest +import gr.grnet.aquarium.actor.message.GetUserWalletResponse +import gr.grnet.aquarium.actor.message.GetUserWalletResponseData +import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded +import gr.grnet.aquarium.actor.message.config.InitializeUserActorState +import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap} import gr.grnet.aquarium.computation.BillingMonthInfo -import gr.grnet.aquarium.charging.state.UserStateBootstrap -import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel} -import gr.grnet.aquarium.charging.reason.{InitialUserActorSetup, RealtimeChargingReason} -import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement} +import gr.grnet.aquarium.message.avro.gen.{IMEventMsg, ResourceEventMsg, UserStateMsg} +import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers} +import gr.grnet.aquarium.service.event.BalanceEvent +import gr.grnet.aquarium.util.date.TimeHelpers +import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf} +import gr.grnet.aquarium.policy.{ResourceType, PolicyModel} +import gr.grnet.aquarium.charging.bill.BillEntryMsg /** * @@ -59,11 +68,11 @@ import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreemen class UserActor extends ReflectiveRoleableActor { private[this] var _userID: String = "" - private[this] var _workingUserState: WorkingUserState = _ - private[this] var _userCreationIMEvent: IMEventModel = _ - private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory - private[this] var _latestIMEventID: String = "" - private[this] var _latestResourceEventID: String = "" + private[this] var _userState: UserStateModel = _ + private[this] var _userCreationIMEvent: IMEventMsg = _ + private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _ + private[this] var _latestIMEventOriginalID: String = "" + private[this] var _latestResourceEventOriginalID: String = "" private[this] var _userStateBootstrap: UserStateBootstrap = _ def unsafeUserID = { @@ -96,7 +105,7 @@ class UserActor extends ReflectiveRoleableActor { private[this] def chargingService = aquarium.chargingService - private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ { + private[this] def stdUserStateStoreFunc = (userState: UserStateMsg) ⇒ { aquarium.userStateStore.insertUserState(userState) } @@ -112,49 +121,51 @@ class UserActor extends ReflectiveRoleableActor { } @inline private[this] def haveAgreements = { - this._workingAgreementHistory.size > 0 + (this._userAgreementHistoryModel ne null) && this._userAgreementHistoryModel.size > 0 } - @inline private[this] def haveWorkingUserState = { - this._workingUserState ne null + @inline private[this] def haveUserState = { + this._userState ne null } @inline private[this] def haveUserStateBootstrap = { this._userStateBootstrap ne null } - private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = { - if(imEvent.isCreateUser) { + private[this] def updateAgreementHistoryFrom(imEvent: IMEventMsg): Unit = { + val isCreateUser = MessageHelpers.isIMEventCreate(imEvent) + if(isCreateUser) { if(haveUserCreationIMEvent) { throw new AquariumInternalError( "Got user creation event (id=%s) but I already have one (id=%s)", - this._userCreationIMEvent.id, - imEvent.id + this._userCreationIMEvent.getOriginalID, + imEvent.getOriginalID ) } this._userCreationIMEvent = imEvent } - val effectiveFromMillis = imEvent.occurredMillis - val role = imEvent.role + val effectiveFromMillis = imEvent.getOccurredMillis + val role = imEvent.getRole // calling unsafe just for the side-effect - assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis)) - - val newAgreement = StdUserAgreement( - imEvent.id, - Some(imEvent.id), - effectiveFromMillis, - Long.MaxValue, - role, - PolicyDefinedFullPriceTableRef - ) + assert(null ne aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis)) + + // add to model (will update the underlying messages as well) + if(this._userAgreementHistoryModel eq null) { + this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModelFromIMEvent(imEvent, imEvent.getOriginalID) + } else { + val newUserAgreementModel = ModelFactory.newUserAgreementModelFromIMEvent(imEvent, imEvent.getOriginalID) + this._userAgreementHistoryModel += newUserAgreementModel + } + } - this._workingAgreementHistory += newAgreement + private[this] def updateLatestIMEventIDFrom(imEvent: IMEventMsg): Unit = { + this._latestIMEventOriginalID = imEvent.getOriginalID } - private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = { - this._latestIMEventID = imEvent.id + private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventMsg): Unit = { + this._latestResourceEventOriginalID = rcEvent.getOriginalID } /** @@ -170,7 +181,7 @@ class UserActor extends ReflectiveRoleableActor { } if(haveAgreements) { - DEBUG("Initial agreement history %s", this._workingAgreementHistory.toJsonString) + DEBUG("Initial agreement history %s", this._userAgreementHistoryModel.toJsonString) logSeparator() } } @@ -183,40 +194,29 @@ class UserActor extends ReflectiveRoleableActor { haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap } - private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = { + private[this] def loadUserStateAndUpdateAgreementHistory(): Unit = { assert(this.haveAgreements, "this.haveAgreements") assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent") - val userCreationMillis = this._userCreationIMEvent.occurredMillis - val userCreationRole = this._userCreationIMEvent.role // initial role - val userCreationIMEventID = this._userCreationIMEvent.id - if(!haveUserStateBootstrap) { - this._userStateBootstrap = UserStateBootstrap( - this._userID, - userCreationMillis, - aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)), - aquarium.initialUserBalance(userCreationRole, userCreationMillis) - ) + this._userStateBootstrap = aquarium.getUserStateBootstrap(this._userCreationIMEvent) } - + logger.debug("#### this._userStateBootStrap %s".format(this._userStateBootstrap.toString)) val now = TimeHelpers.nowMillis() - this._workingUserState = chargingService.replayMonthChargingUpTo( + this._userState = chargingService.replayMonthChargingUpTo( BillingMonthInfo.fromMillis(now), now, this._userStateBootstrap, aquarium.currentResourceTypesMap, - InitialUserActorSetup(), - aquarium.userStateStore.insertUserState, - None + aquarium.userStateStore.insertUserState ) // Final touch: Update agreement history in the working user state. // The assumption is that all agreement changes go via IMEvents, so the // state this._workingAgreementHistory is always the authoritative source. - if(haveWorkingUserState) { - this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory) - DEBUG("Computed working user state %s", this._workingUserState.toJsonString) + if(haveUserState) { + this._userState.userAgreementHistoryModel = this._userAgreementHistoryModel + DEBUG("Computed working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg)) } } @@ -232,10 +232,10 @@ class UserActor extends ReflectiveRoleableActor { } // We will also need this functionality when receiving IMEvents, so we place it in a method - loadWorkingUserStateAndUpdateAgreementHistory() + loadUserStateAndUpdateAgreementHistory() - if(haveWorkingUserState) { - DEBUG("Initial working user state %s", this._workingUserState.toJsonString) + if(haveUserState) { + DEBUG("Initial working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg)) logSeparator() } } @@ -249,46 +249,48 @@ class UserActor extends ReflectiveRoleableActor { } /** - * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s. + * Process [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s. * When this method is called, we assume that all proper checks have been made and it * is OK to proceed with the event processing. */ - def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = { - val imEvent = processEvent.imEvent + def onIMEventMsg(imEvent: IMEventMsg): Unit = { val hadUserCreationIMEvent = haveUserCreationIMEvent if(!haveAgreements) { - // This is an error. Should have been initialized from somewhere ... - throw new AquariumInternalError("No agreements. Cannot process %s", processEvent) + // This IMEvent has arrived after any ResourceEvents + INFO("Arrived after any ResourceEvent: %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent)) + initializeStateOfIMEvents() } + else { + if(this._latestIMEventOriginalID == imEvent.getOriginalID) { + // This happens when the actor is brought to life, then immediately initialized, and then + // sent the first IM event. But from the initialization procedure, this IM event will have + // already been loaded from DB! + INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent)) + logSeparator() + + //this._latestIMEventID = imEvent.id + return + } - if(this._latestIMEventID == imEvent.id) { - // This happens when the actor is brought to life, then immediately initialized, and then - // sent the first IM event. But from the initialization procedure, this IM event will have - // already been loaded from DB! - INFO("Ignoring first %s", imEvent.toDebugString) - logSeparator() - - //this._latestIMEventID = imEvent.id - return + updateAgreementHistoryFrom(imEvent) + updateLatestIMEventIDFrom(imEvent) } - updateAgreementHistoryFrom(imEvent) - updateLatestIMEventIDFrom(imEvent) - // Must also update user state if we know when in history the life of a user begins if(!hadUserCreationIMEvent && haveUserCreationIMEvent) { - loadWorkingUserStateAndUpdateAgreementHistory() + INFO("Processing user state, since we had a CREATE IMEvent") + loadUserStateAndUpdateAgreementHistory() } - } - def onProcessResourceEvent(event: ProcessResourceEvent): Unit = { - val rcEvent = event.rcEvent + logSeparator() + } + def onResourceEventMsg(rcEvent: ResourceEventMsg): Unit = { if(!shouldProcessResourceEvents) { // This means the user has not been created (at least, as far as Aquarium is concerned). // So, we do not process any resource event - DEBUG("Not processing %s", rcEvent.toJsonString) + DEBUG("Not processing %s", AvroHelpers.jsonStringOfSpecificRecord(rcEvent)) logSeparator() return @@ -298,56 +300,133 @@ class UserActor extends ReflectiveRoleableActor { // we do not need to query the store. Just query the in-memory state. // Note: This is a similar situation with the first IMEvent received right after the user // actor is created. - if(this._latestResourceEventID == rcEvent.id) { - INFO("Ignoring first %s", rcEvent.toDebugString) + if(this._latestResourceEventOriginalID == rcEvent.getOriginalID) { + INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(rcEvent)) logSeparator() return } val now = TimeHelpers.nowMillis() - val billingMonthInfo = BillingMonthInfo.fromMillis(now) + // TODO: Review this and its usage in user state. + // TODO: The assumption is that the resource set increases all the time, + // TODO: so the current map contains everything ever known (assuming we do not run backwards in time). val currentResourcesMap = aquarium.currentResourceTypesMap - val calculationReason = RealtimeChargingReason(None, now) - val eventOccurredMillis = rcEvent.occurredMillis -// DEBUG("Using %s", currentResourceTypesMap.toJsonString) - if(rcEvent.occurredMillis >= _workingUserState.occurredMillis) { - chargingService.processResourceEvent( - rcEvent, - this._workingUserState, - calculationReason, - billingMonthInfo, - None - ) - } - else { - // Oops. Event is OUT OF SYNC - DEBUG("OUT OF SYNC %s", rcEvent.toDebugString) - this._workingUserState = chargingService.replayMonthChargingUpTo( - billingMonthInfo, + val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now) + val nowYear = nowBillingMonthInfo.year + val nowMonth = nowBillingMonthInfo.month + + val eventOccurredMillis = rcEvent.getOccurredMillis + val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis) + val eventYear = eventBillingMonthInfo.year + val eventMonth = eventBillingMonthInfo.month + + def computeBatch(): Unit = { + DEBUG("Going for out of sync charging for %s", rcEvent.getOriginalID) + this._userState = chargingService.replayMonthChargingUpTo( + nowBillingMonthInfo, // Take into account that the event may be out-of-sync. // TODO: Should we use this._latestResourceEventOccurredMillis instead of now? now max eventOccurredMillis, this._userStateBootstrap, currentResourcesMap, - calculationReason, - stdUserStateStoreFunc, - None + stdUserStateStoreFunc ) + + updateLatestResourceEventIDFrom(rcEvent) } - DEBUG("Updated %s", this._workingUserState) + def computeRealtime(): Unit = { + DEBUG("Going for in sync charging for %s", rcEvent.getOriginalID) + chargingService.processResourceEvent( + rcEvent, + this._userState, + nowBillingMonthInfo, + true + ) + + updateLatestResourceEventIDFrom(rcEvent) + } + + val oldTotalCredits = + if(this._userState!=null) + this._userState.totalCredits + else + 0.0D + // FIXME check these + if(this._userState eq null) { + computeBatch() + } + else if(nowYear != eventYear || nowMonth != eventMonth) { + DEBUG( + "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)", + nowYear, eventYear, + nowMonth, eventMonth + ) + computeBatch() + } + else if(this._userState.latestResourceEventOccurredMillis < rcEvent.getOccurredMillis) { + DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis") + DEBUG( + "%s < %s", + TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis), + TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis) + ) + computeRealtime() + } + else { + DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s and rcEvent.occurredMillis=%s", + TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis), + TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis)) + + computeBatch() + } + val newTotalCredits = this._userState.totalCredits + if(oldTotalCredits * newTotalCredits < 0) + aquarium.eventBus ! new BalanceEvent(this._userState.userID, + newTotalCredits>=0) + DEBUG("Updated %s", this._userState) logSeparator() } + def onGetUserBillRequest(event: GetUserBillRequest): Unit = { + try{ + val timeslot = event.timeslot + val resourceTypes = aquarium.policyStore. + loadSortedPolicyModelsWithin(timeslot.from.getTime, + timeslot.to.getTime). + values.headOption match { + case None => Map[String,ResourceType]() + case Some(policy:PolicyModel) => policy.resourceTypesMap + } + val state= if(haveUserState) Some(this._userState.msg) else None + val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this._userID,state,resourceTypes) + //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry) + //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString) + val billData = GetUserBillResponseData(this._userID,billEntryMsg) + sender ! GetUserBillResponse(Right(billData)) + } catch { + case e:Exception => + e.printStackTrace() + sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500) + } + } + def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = { val userID = event.userID - (haveUserCreationIMEvent, haveWorkingUserState) match { + (haveUserCreationIMEvent, haveUserState) match { case (true, true) ⇒ // (User CREATEd, with balance state) - sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits))) + val realtimeMillis = TimeHelpers.nowMillis() + chargingService.calculateRealtimeUserState( + this._userState, + BillingMonthInfo.fromMillis(realtimeMillis), + realtimeMillis + ) + + sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._userState.totalCredits))) case (true, false) ⇒ // (User CREATEd, no balance state) @@ -356,7 +435,7 @@ class UserActor extends ReflectiveRoleableActor { Right( GetUserBalanceResponseData( this._userID, - aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis) + aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis) ))) case (false, true) ⇒ @@ -372,15 +451,61 @@ class UserActor extends ReflectiveRoleableActor { } def onGetUserStateRequest(event: GetUserStateRequest): Unit = { - haveWorkingUserState match { + haveUserState match { case true ⇒ - sender ! GetUserStateResponse(Right(this._workingUserState)) + val realtimeMillis = TimeHelpers.nowMillis() + chargingService.calculateRealtimeUserState( + this._userState, + BillingMonthInfo.fromMillis(realtimeMillis), + realtimeMillis + ) + + sender ! GetUserStateResponse(Right(this._userState.msg)) case false ⇒ sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404) } } + def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = { + haveUserState match { + case true ⇒ + DEBUG("haveWorkingUserState: %s", event) + val realtimeMillis = TimeHelpers.nowMillis() + chargingService.calculateRealtimeUserState( + this._userState, + BillingMonthInfo.fromMillis(realtimeMillis), + realtimeMillis + ) + + sender ! GetUserWalletResponse( + Right( + GetUserWalletResponseData( + this._userID, + this._userState.totalCredits, + MessageFactory.newWalletEntriesMsg(this._userState.msg.getWalletEntries) + ))) + + case false ⇒ + DEBUG("!haveWorkingUserState: %s", event) + haveUserCreationIMEvent match { + case true ⇒ + DEBUG("haveUserCreationIMEvent: %s", event) + sender ! GetUserWalletResponse( + Right( + GetUserWalletResponseData( + this._userID, + aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis), + MessageFactory.newWalletEntriesMsg() + ))) + + case false ⇒ + DEBUG("!haveUserCreationIMEvent: %s", event) + sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404) + } + } + } + private[this] def D_userID = { this._userID }