X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/d7f5340d67d5fb608835239ed8c34f03ba910687..db5e3882a783cd83c5d50e17f00bfc8211d7d52b:/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index f1feaeb..59b52f2 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -37,7 +37,7 @@ package gr.grnet.aquarium.actor package service package user -import gr.grnet.aquarium.AquariumInternalError +import gr.grnet.aquarium.{Real, AquariumInternalError} import gr.grnet.aquarium.actor.message.GetUserBalanceRequest import gr.grnet.aquarium.actor.message.GetUserBalanceResponse import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData @@ -50,17 +50,14 @@ import gr.grnet.aquarium.actor.message.GetUserWalletRequest import gr.grnet.aquarium.actor.message.GetUserWalletResponse import gr.grnet.aquarium.actor.message.GetUserWalletResponseData import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded -import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap} -import gr.grnet.aquarium.computation.BillingMonthInfo -import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg, UserStateMsg} -import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers} +import gr.grnet.aquarium.charging.state.UserStateModel +import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg} +import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers} import gr.grnet.aquarium.service.event.BalanceEvent import gr.grnet.aquarium.util.date.TimeHelpers import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf} import gr.grnet.aquarium.policy.{ResourceType, PolicyModel} import gr.grnet.aquarium.charging.bill.BillEntryMsg -import gr.grnet.aquarium.event.CreditsModel -import java.util /** * @@ -68,18 +65,15 @@ import java.util */ class UserActor extends ReflectiveRoleableActor { - private[this] var _rcMsgCount = 0 private[this] var _imMsgCount = 0 - private[this] var _userID: String = "???" - private[this] var _userStateMsg: UserStateMsg = _ - private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _ + private[this] var _userStateModel: UserStateModel = _ - def unsafeUserID = { - if(!haveUserID) { + def userID = { + if(!haveUserState) { throw new AquariumInternalError("%s not initialized") } - this._userID + this._userStateModel.userID } override def postStop() { @@ -88,7 +82,7 @@ class UserActor extends ReflectiveRoleableActor { } private[this] def shutmedown(): Unit = { - if(haveUserID) { + if(haveUserState) { aquarium.akkaService.invalidateUserActor(this) } } @@ -104,48 +98,41 @@ class UserActor extends ReflectiveRoleableActor { private[this] def chargingService = aquarium.chargingService - private[this] def stdUserStateStoreFunc = (userState: UserStateMsg) ⇒ { - aquarium.userStateStore.insertUserState(userState) - } - def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { } - private[this] def haveUserID = this._userID ne null - private[this] def unsafeUserCreationIMEventMsg = this._userAgreementHistoryModel.unsafeUserCreationIMEvent - private[this] def haveAgreements = this._userAgreementHistoryModel ne null - private[this] def isUserCreated = haveAgreements && this._userAgreementHistoryModel.hasUserCreationEvent - private[this] def haveUserState = this._userStateMsg ne null + private[this] def unsafeUserCreationIMEventMsg = { + this._userStateModel.unsafeUserCreationIMEvent + } - private[this] def createInitialUserStateMsgFromCreateIMEvent() { - assert(haveAgreements, "haveAgreements") - assert(isUserCreated, "isUserCreated") - assert(this._userAgreementHistoryModel.hasUserCreationEvent, "this._userAgreementHistoryModel.hasUserCreationEvent") + private[this] def haveAgreements = { + (this._userStateModel ne null) + } - val userCreationIMEventMsg = unsafeUserCreationIMEventMsg - val userStateBootstrap = aquarium.getUserStateBootstrap(userCreationIMEventMsg) + private[this] def haveUserCreationEvent = { + haveAgreements && + this._userStateModel.hasUserCreationEvent + } - this._userStateMsg = MessageFactory.newInitialUserStateMsg( - this._userID, - CreditsModel.from(0.0), - TimeHelpers.nowMillis() - ) + private[this] def haveUserState = { + (this._userStateModel ne null) } + private[this] def isInitial = this._userStateModel.isInitial + /** * Creates the agreement history from all the stored IMEvents. * * @return (`true` iff there was a user CREATE event, the number of events processed) */ - private[this] def createUserAgreementHistoryFromStoredIMEvents(): (Boolean, Int) = { + private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = { DEBUG("createUserAgreementHistoryFromStoredIMEvents()") - val historyMsg = MessageFactory.newUserAgreementHistoryMsg(this._userID) - this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModel(historyMsg) + assert(haveUserState, "haveUserState") + - // NOTE: this._userID is already set up our caller var _imcounter = 0 - val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒ + val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒ _imcounter += 1 DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent) @@ -164,31 +151,72 @@ class UserActor extends ReflectiveRoleableActor { "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis) ) - this._userAgreementHistoryModel.insertUserAgreementMsgFromIMEvent(imEvent) + this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent) true } } - DEBUG("Agreements: %s", this._userAgreementHistoryModel) + this._imMsgCount = _imcounter + + DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg) (hadCreateEvent, _imcounter) } + private[this] def saveFirstUserState(userID: String) { + this._userStateModel.userStateMsg.setIsFirst(true) + this._userStateModel.updateUserStateMsg( + aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg) + ) + } + + private[this] def saveSubsequentUserState() { + this._userStateModel.userStateMsg.setIsFirst(false) + this._userStateModel.updateUserStateMsg( + aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg) + ) + } + + private[this] def loadLastKnownUserStateAndUpdateAgreements(historyMsg: UserAgreementHistoryMsg) { + val userID = historyMsg.getUserID + val latestUserStateOpt = aquarium.userStateStore.findLatestUserState(userID) + latestUserStateOpt match { + case None ⇒ + // First user state ever + saveFirstUserState(userID) + + case Some(latestUserState) ⇒ + this._userStateModel.updateUserStateMsg(latestUserState) + this._userStateModel.updateUserAgreementHistoryMsg(historyMsg) + } + } + + private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = { + if(!isInitial) { + return false + } + + val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID) + + if(userCreated) { + loadLastKnownUserStateAndUpdateAgreements(this._userStateModel.userAgreementHistoryMsg) + } + + nextThing() + + true + } + /** * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the * messaging hub (rabbitmq). */ def onIMEventMsg(imEvent: IMEventMsg) { - if(!isUserCreated && MessageHelpers.isIMEventCreate(imEvent)) { - assert(this._imMsgCount == 0, "this._imMsgCount == 0") - // Create the full agreement history from the original sources (IMEvents) - val (userCreated, imEventsCount) = createUserAgreementHistoryFromStoredIMEvents() - - this._imMsgCount = imEventsCount + if(checkInitial()) { return } // Check for out of sync (regarding IMEvents) - val isOutOfSyncIM = imEvent.getOccurredMillis < this._userAgreementHistoryModel.latestIMEventOccurredMillis + val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis if(isOutOfSyncIM) { // clear all resource state // FIXME implement @@ -204,105 +232,52 @@ class UserActor extends ReflectiveRoleableActor { return } - // OK, seems good - assert(!MessageHelpers.isIMEventCreate(imEvent), "!MessageHelpers.isIMEventCreate(imEvent)") - // Make new agreement - this._userAgreementHistoryModel.insertUserAgreementMsgFromIMEvent(imEvent) + this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent) this._imMsgCount += 1 - DEBUG("Agreements: %s", this._userAgreementHistoryModel) + + if(haveUserCreationEvent) { + loadLastKnownUserStateAndUpdateAgreements(this._userStateModel.userAgreementHistoryMsg) + } + + DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg) } def onResourceEventMsg(rcEvent: ResourceEventMsg) { - if(!isUserCreated) { - DEBUG("No agreements. Ignoring %s", rcEvent) - + if(checkInitial()) { return } - val now = TimeHelpers.nowMillis() - val resourceMapping = aquarium.resourceMappingAtMillis(now) + if(!haveUserCreationEvent) { + DEBUG("No agreements. Ignoring %s", rcEvent) - val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now) - val nowYear = nowBillingMonthInfo.year - val nowMonth = nowBillingMonthInfo.month + return + } - val eventOccurredMillis = rcEvent.getOccurredMillis - val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis) - val eventYear = eventBillingMonthInfo.year - val eventMonth = eventBillingMonthInfo.month + assert(haveUserState, "haveUserState") - def computeBatch(): Unit = { - DEBUG("Going for out of sync charging for %s", rcEvent.getOriginalID) + val oldTotalCredits = this._userStateModel.totalCreditsAsReal - this._userStateMsg = chargingService.replayMonthChargingUpTo( - this._userAgreementHistoryModel, - nowBillingMonthInfo, - // Take into account that the event may be out-of-sync. - // TODO: Should we use this._latestResourceEventOccurredMillis instead of now? - now max eventOccurredMillis, - resourceMapping, - stdUserStateStoreFunc - ) + chargingService.processResourceEvent( + rcEvent.getReceivedMillis, + rcEvent, + this._userStateModel, + aquarium.currentResourceMapping, + true + ) - } + val newTotalCredits = this._userStateModel.totalCreditsAsReal - def computeRealtime(): Unit = { - DEBUG("Going for in sync charging for %s", rcEvent.getOriginalID) - chargingService.processResourceEvent( - rcEvent, - this._userAgreementHistoryModel, - this._userStateMsg, - nowBillingMonthInfo, - true, - resourceMapping - ) - - this._rcMsgCount += 1 + if(oldTotalCredits.signum * newTotalCredits.signum < 0) { + aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0) } - val oldTotalCredits = - if(this._userStateMsg!=null) - this._userStateMsg.totalCredits - else - 0.0D - // FIXME check these - if(this._userStateMsg eq null) { - computeBatch() - } - else if(nowYear != eventYear || nowMonth != eventMonth) { - DEBUG( - "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)", - nowYear, eventYear, - nowMonth, eventMonth - ) - computeBatch() - } - else if(this._userStateMsg.latestResourceEventOccurredMillis < rcEvent.getOccurredMillis) { - DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis") - DEBUG( - "%s < %s", - TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userStateMsg.latestResourceEventOccurredMillis), - TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis) - ) - computeRealtime() - } - else { - DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s and rcEvent.occurredMillis=%s", - TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userStateMsg.latestResourceEventOccurredMillis), - TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis)) - - computeBatch() - } - val newTotalCredits = this._userStateMsg.totalCredits - if(oldTotalCredits * newTotalCredits < 0) - aquarium.eventBus ! new BalanceEvent(this._userStateMsg.userID, - newTotalCredits>=0) - DEBUG("Updated %s", this._userStateMsg) - logSeparator() + DEBUG("Updated %s", this._userStateModel) } def onGetUserBillRequest(event: GetUserBillRequest): Unit = { + checkInitial() + try{ val timeslot = event.timeslot val resourceTypes: Map[String, ResourceType] = aquarium.policyStore. @@ -312,11 +287,11 @@ class UserActor extends ReflectiveRoleableActor { case None => Map[String,ResourceType]() case Some(policy:PolicyModel) => policy.resourceTypesMap } - val state= if(haveUserState) Some(this._userStateMsg) else None - val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this._userID,state,resourceTypes) + val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None + val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes) //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry) //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString) - val billData = GetUserBillResponseData(this._userID,billEntryMsg) + val billData = GetUserBillResponseData(this.userID,billEntryMsg) sender ! GetUserBillResponse(Right(billData)) } catch { case e:Exception => @@ -326,6 +301,8 @@ class UserActor extends ReflectiveRoleableActor { } def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = { + checkInitial() + val userID = event.userID (haveAgreements, haveUserState) match { @@ -333,14 +310,12 @@ class UserActor extends ReflectiveRoleableActor { // (User CREATEd, with balance state) val realtimeMillis = TimeHelpers.nowMillis() chargingService.calculateRealtimeUserState( - this._userAgreementHistoryModel, - this._userStateMsg, - BillingMonthInfo.fromMillis(realtimeMillis), - aquarium.resourceMappingAtMillis(realtimeMillis), + this._userStateModel, + aquarium.currentResourceMapping, realtimeMillis ) - sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._userStateMsg.totalCredits))) + sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits))) case (true, false) ⇒ // (User CREATEd, no balance state) @@ -348,8 +323,8 @@ class UserActor extends ReflectiveRoleableActor { sender ! GetUserBalanceResponse( Right( GetUserBalanceResponseData( - this._userID, - aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis) + this.userID, + aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString() ))) case (false, true) ⇒ @@ -365,18 +340,18 @@ class UserActor extends ReflectiveRoleableActor { } def onGetUserStateRequest(event: GetUserStateRequest): Unit = { + checkInitial() + haveUserState match { case true ⇒ val realtimeMillis = TimeHelpers.nowMillis() chargingService.calculateRealtimeUserState( - this._userAgreementHistoryModel, - this._userStateMsg, - BillingMonthInfo.fromMillis(realtimeMillis), - aquarium.resourceMappingAtMillis(realtimeMillis), + this._userStateModel, + aquarium.currentResourceMapping, realtimeMillis ) - sender ! GetUserStateResponse(Right(this._userStateMsg)) + sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg)) case false ⇒ sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404) @@ -384,24 +359,24 @@ class UserActor extends ReflectiveRoleableActor { } def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = { + checkInitial() + haveUserState match { case true ⇒ DEBUG("haveWorkingUserState: %s", event) val realtimeMillis = TimeHelpers.nowMillis() chargingService.calculateRealtimeUserState( - this._userAgreementHistoryModel, - this._userStateMsg, - BillingMonthInfo.fromMillis(realtimeMillis), - aquarium.resourceMappingAtMillis(realtimeMillis), + this._userStateModel, + aquarium.currentResourceMapping, realtimeMillis ) sender ! GetUserWalletResponse( Right( GetUserWalletResponseData( - this._userID, - this._userStateMsg.totalCredits, - MessageFactory.newWalletEntriesMsg(this._userStateMsg.getWalletEntries) + this.userID, + this._userStateModel.totalCredits, + MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries) ))) case false ⇒ @@ -412,8 +387,8 @@ class UserActor extends ReflectiveRoleableActor { sender ! GetUserWalletResponse( Right( GetUserWalletResponseData( - this._userID, - aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis), + this.userID, + Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)), MessageFactory.newWalletEntriesMsg() ))) @@ -424,12 +399,24 @@ class UserActor extends ReflectiveRoleableActor { } } + /** + * Initializes the actor's internal state. + * + * @param userID + */ def onSetUserActorUserID(userID: String) { - this._userID = userID + // Create the full agreement history from the original sources (IMEvents) + this._userStateModel = ModelFactory.newInitialUserStateModel( + userID, + Real(0), + TimeHelpers.nowMillis() + ) + + require(this._userStateModel.isInitial, "this._userStateModel.isInitial") } private[this] def D_userID = { - this._userID + if(haveUserState) userID else "???" } private[this] def DEBUG(fmt: String, args: Any*) =