X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/53ea3611f79e90f4257f12c63eae31bb54d88d0c..060b04bd9458fd5ec3c68c80671595e24a09858a:/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index 7d339a4..d7604cc 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -37,7 +37,7 @@ package gr.grnet.aquarium.actor package service package user -import gr.grnet.aquarium.AquariumInternalError +import gr.grnet.aquarium.{Real, AquariumInternalError} import gr.grnet.aquarium.actor.message.GetUserBalanceRequest import gr.grnet.aquarium.actor.message.GetUserBalanceResponse import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData @@ -50,15 +50,14 @@ import gr.grnet.aquarium.actor.message.GetUserWalletRequest import gr.grnet.aquarium.actor.message.GetUserWalletResponse import gr.grnet.aquarium.actor.message.GetUserWalletResponseData import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded -import gr.grnet.aquarium.charging.bill.AbstractBillEntry -import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap} -import gr.grnet.aquarium.computation.BillingMonthInfo -import gr.grnet.aquarium.message.avro.gen.{IMEventMsg, ResourceEventMsg, UserStateMsg} -import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers} +import gr.grnet.aquarium.charging.state.UserStateModel +import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg} +import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers} import gr.grnet.aquarium.service.event.BalanceEvent import gr.grnet.aquarium.util.date.TimeHelpers import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf} import gr.grnet.aquarium.policy.{ResourceType, PolicyModel} +import gr.grnet.aquarium.charging.bill.BillEntryMsg /** * @@ -66,23 +65,15 @@ import gr.grnet.aquarium.policy.{ResourceType, PolicyModel} */ class UserActor extends ReflectiveRoleableActor { - private[this] var _isFirstMessage = true - private[this] var _rcMsgCount = 0 private[this] var _imMsgCount = 0 - private[this] var _userID: String = "" - private[this] var _userState: UserStateModel = _ - private[this] var _userCreationIMEvent: IMEventMsg = _ - private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _ - private[this] var _latestIMEventOriginalID: String = "" - private[this] var _latestResourceEventOriginalID: String = "" - private[this] var _userStateBootstrap: UserStateBootstrap = _ - - def unsafeUserID = { - if(!haveUserID) { + private[this] var _userStateModel: UserStateModel = _ + + def userID = { + if(!haveUserState) { throw new AquariumInternalError("%s not initialized") } - this._userID + this._userStateModel.userID } override def postStop() { @@ -91,7 +82,7 @@ class UserActor extends ReflectiveRoleableActor { } private[this] def shutmedown(): Unit = { - if(haveUserID) { + if(haveUserState) { aquarium.akkaService.invalidateUserActor(this) } } @@ -107,317 +98,236 @@ class UserActor extends ReflectiveRoleableActor { private[this] def chargingService = aquarium.chargingService - private[this] def stdUserStateStoreFunc = (userState: UserStateMsg) ⇒ { - aquarium.userStateStore.insertUserState(userState) + def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { } - @inline private[this] def haveUserID = { - this._userID ne null + private[this] def unsafeUserCreationIMEventMsg = { + this._userStateModel.unsafeUserCreationIMEvent } - @inline private[this] def haveUserCreationIMEvent = { - this._userCreationIMEvent ne null + private[this] def haveAgreements = { + (this._userStateModel ne null) } - def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { + private[this] def haveUserCreationEvent = { + haveAgreements && + this._userStateModel.hasUserCreationEvent } - @inline private[this] def haveAgreements = { - (this._userAgreementHistoryModel ne null) && this._userAgreementHistoryModel.size > 0 + private[this] def haveUserState = { + (this._userStateModel ne null) } - @inline private[this] def haveUserState = { - this._userState ne null - } + private[this] def isInitial = this._userStateModel.isInitial - @inline private[this] def haveUserStateBootstrap = { - this._userStateBootstrap ne null - } + /** + * Creates the agreement history from all the stored IMEvents. + * + * @return (`true` iff there was a user CREATE event, the number of events processed) + */ + private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = { + DEBUG("createUserAgreementHistoryFromStoredIMEvents()") + assert(haveUserState, "haveUserState") - private[this] def updateAgreementHistoryFrom(imEvent: IMEventMsg): Unit = { - val isCreateUser = MessageHelpers.isIMEventCreate(imEvent) - if(isCreateUser) { - if(haveUserCreationIMEvent) { - throw new AquariumInternalError( - "Got user creation event (id=%s) but I already have one (id=%s)", - this._userCreationIMEvent.getOriginalID, - imEvent.getOriginalID - ) + + var _imcounter = 0 + + val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒ + _imcounter += 1 + DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent) + + if(_imcounter == 1 && !MessageHelpers.isUserCreationIMEvent(imEvent)) { + // The very first event must be a CREATE event. Otherwise we abort initialization. + // This will normally happen during devops :) + INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent)) + false } + else { + val effectiveFromMillis = imEvent.getOccurredMillis + val role = imEvent.getRole + // calling unsafe just for the side-effect + assert( + aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis) ne null, + "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis) + ) - this._userCreationIMEvent = imEvent + this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent) + true + } } - val effectiveFromMillis = imEvent.getOccurredMillis - val role = imEvent.getRole - // calling unsafe just for the side-effect - assert(null ne aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis)) - - // add to model (will update the underlying messages as well) - if(this._userAgreementHistoryModel eq null) { - this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModelFromIMEvent(imEvent, imEvent.getOriginalID) - } else { - val newUserAgreementModel = ModelFactory.newUserAgreementModelFromIMEvent(imEvent, imEvent.getOriginalID) - this._userAgreementHistoryModel += newUserAgreementModel - } + this._imMsgCount = _imcounter + + DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg) + (hadCreateEvent, _imcounter) } - private[this] def updateLatestIMEventIDFrom(imEvent: IMEventMsg): Unit = { - this._latestIMEventOriginalID = imEvent.getOriginalID + private[this] def saveFirstUserState(userID: String) { + this._userStateModel.userStateMsg.setIsFirst(true) + this._userStateModel.updateUserStateMsg( + aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg) + ) } - private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventMsg): Unit = { - this._latestResourceEventOriginalID = rcEvent.getOriginalID + private[this] def saveSubsequentUserState() { + this._userStateModel.userStateMsg.setIsFirst(false) + this._userStateModel.updateUserStateMsg( + aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg) + ) } - /** - * Creates the initial state that is related to IMEvents. - */ - private[this] def initializeStateOfIMEvents(): Unit = { - // NOTE: this._userID is already set up our caller - aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒ - DEBUG("Replaying %s", imEvent) + private[this] def loadLastKnownUserStateAndUpdateAgreements() { + val userID = this._userStateModel.userID + aquarium.userStateStore.findLatestUserState(userID) match { + case None ⇒ + // First user state ever + saveFirstUserState(userID) - updateAgreementHistoryFrom(imEvent) - updateLatestIMEventIDFrom(imEvent) + case Some(latestUserState) ⇒ + this._userStateModel.updateUserStateMsg(latestUserState) } } - /** - * Resource events are processed only if the user has been created and has agreements. - * Otherwise nothing can be computed. - */ - private[this] def shouldProcessResourceEvents: Boolean = { - haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap - } - - private[this] def loadUserStateAndUpdateAgreementHistory(): Unit = { - assert(this.haveAgreements, "this.haveAgreements") - assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent") - - if(!haveUserStateBootstrap) { - this._userStateBootstrap = aquarium.getUserStateBootstrap(this._userCreationIMEvent) - } - logger.debug("#### this._userStateBootStrap %s".format(this._userStateBootstrap.toString)) - val now = TimeHelpers.nowMillis() - this._userState = chargingService.replayMonthChargingUpTo( - BillingMonthInfo.fromMillis(now), - now, - this._userStateBootstrap, - aquarium.currentResourceTypesMap, - aquarium.userStateStore.insertUserState + private[this] def processResourceEventsAfterLastKnownUserState() { + // Update the user state snapshot with fresh (ie not previously processed) events. + aquarium.resourceEventStore.foreachResourceEventOccurredInPeriod( + this._userStateModel.userID, + this._userStateModel.latestResourceEventOccurredMillis, + TimeHelpers.nowMillis() ) + } - // Final touch: Update agreement history in the working user state. - // The assumption is that all agreement changes go via IMEvents, so the - // state this._workingAgreementHistory is always the authoritative source. - if(haveUserState) { - this._userState.userAgreementHistoryModel = this._userAgreementHistoryModel - DEBUG("Computed working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg)) - } + private[this] def makeUserStateMsgUpToDate() { + loadLastKnownUserStateAndUpdateAgreements() + processResourceEventsAfterLastKnownUserState() } - private[this] def initializeStateOfResourceEvents(): Unit = { - if(!this.haveAgreements) { - DEBUG("No agreements to initialize resources state") - return + private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = { + if(!isInitial) { + return false } - if(!this.haveUserCreationIMEvent) { - DEBUG("No CREATE IMEvent to initialize resources state") - return - } + val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID) - // We will also need this functionality when receiving IMEvents, so we place it in a method - loadUserStateAndUpdateAgreementHistory() - - if(haveUserState) { - DEBUG("Initial working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg)) - logSeparator() + if(userCreated) { + makeUserStateMsgUpToDate() } - } - - /** - * Initializes the actor state from DB. - */ - def initializeUserActorState(userID: String): Unit = { - this._userID = userID - DEBUG("Initializing user actor state") + nextThing() - initializeStateOfIMEvents() - initializeStateOfResourceEvents() + true } /** - * Process [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s. - * When this method is called, we assume that all proper checks have been made and it - * is OK to proceed with the event processing. + * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the + * messaging hub (rabbitmq). */ def onIMEventMsg(imEvent: IMEventMsg) { - if(this._isFirstMessage) { - initializeUserActorState(imEvent.getUserID) - // we ignore this event, since it is already saved in the store and all messages in - // the store have been consulted by initializeUserActorState() - this._isFirstMessage = false + if(checkInitial()) { return } - val hadUserCreationIMEvent = haveUserCreationIMEvent + // Check for out of sync (regarding IMEvents) + val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis + if(isOutOfSyncIM) { + // clear all resource state + // FIXME implement - if(!haveAgreements) { - initializeStateOfIMEvents() + return } - else { - updateAgreementHistoryFrom(imEvent) - updateLatestIMEventIDFrom(imEvent) + + // Check out of sync (regarding ResourceEvents) + val isOutOfSyncRC = false // FIXME implement + if(isOutOfSyncRC) { + // TODO + + return } - // Must also update user state if we know when in history the life of a user begins - if(!hadUserCreationIMEvent && haveUserCreationIMEvent) { - INFO("Processing user state, since we had a CREATE IMEvent") - loadUserStateAndUpdateAgreementHistory() + // Make new agreement + this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent) + this._imMsgCount += 1 + + if(MessageHelpers.isUserCreationIMEvent(imEvent)) { + makeUserStateMsgUpToDate() } - logSeparator() + DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg) } def onResourceEventMsg(rcEvent: ResourceEventMsg) { - if(this._isFirstMessage) { - initializeUserActorState(rcEvent.getUserID) - // we ignore this event, since it is already saved in the store and all messages in - // the store have been consulted by initializeUserActorState() - this._isFirstMessage = false + if(checkInitial()) { return } - if(!shouldProcessResourceEvents) { - // This means the user has not been created (at least, as far as Aquarium is concerned). - // So, we do not process any resource event - DEBUG("Not processing %s", AvroHelpers.jsonStringOfSpecificRecord(rcEvent)) - logSeparator() + if(!haveUserCreationEvent) { + DEBUG("No agreements. Ignoring %s", rcEvent) return } - val now = TimeHelpers.nowMillis() - // TODO: Review this and its usage in user state. - // TODO: The assumption is that the resource set increases all the time, - // TODO: so the current map contains everything ever known (assuming we do not run backwards in time). - val currentResourcesMap = aquarium.currentResourceTypesMap - - val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now) - val nowYear = nowBillingMonthInfo.year - val nowMonth = nowBillingMonthInfo.month - - val eventOccurredMillis = rcEvent.getOccurredMillis - val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis) - val eventYear = eventBillingMonthInfo.year - val eventMonth = eventBillingMonthInfo.month - - def computeBatch(): Unit = { - DEBUG("Going for out of sync charging for %s", rcEvent.getOriginalID) - this._userState = chargingService.replayMonthChargingUpTo( - nowBillingMonthInfo, - // Take into account that the event may be out-of-sync. - // TODO: Should we use this._latestResourceEventOccurredMillis instead of now? - now max eventOccurredMillis, - this._userStateBootstrap, - currentResourcesMap, - stdUserStateStoreFunc - ) - - updateLatestResourceEventIDFrom(rcEvent) - } + assert(haveUserState, "haveUserState") - def computeRealtime(): Unit = { - DEBUG("Going for in sync charging for %s", rcEvent.getOriginalID) - chargingService.processResourceEvent( - rcEvent, - this._userState, - nowBillingMonthInfo, - true - ) + val oldTotalCredits = this._userStateModel.totalCreditsAsReal - updateLatestResourceEventIDFrom(rcEvent) - } + chargingService.processResourceEvent( + rcEvent.getReceivedMillis, + rcEvent, + this._userStateModel, + aquarium.currentResourceMapping, + true + ) - val oldTotalCredits = - if(this._userState!=null) - this._userState.totalCredits - else - 0.0D - // FIXME check these - if(this._userState eq null) { - computeBatch() - } - else if(nowYear != eventYear || nowMonth != eventMonth) { - DEBUG( - "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)", - nowYear, eventYear, - nowMonth, eventMonth - ) - computeBatch() - } - else if(this._userState.latestResourceEventOccurredMillis < rcEvent.getOccurredMillis) { - DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis") - DEBUG( - "%s < %s", - TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis), - TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis) - ) - computeRealtime() - } - else { - DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s and rcEvent.occurredMillis=%s", - TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis), - TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis)) + val newTotalCredits = this._userStateModel.totalCreditsAsReal - computeBatch() + if(oldTotalCredits.signum * newTotalCredits.signum < 0) { + aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0) } - val newTotalCredits = this._userState.totalCredits - if(oldTotalCredits * newTotalCredits < 0) - aquarium.eventBus ! new BalanceEvent(this._userState.userID, - newTotalCredits>=0) - DEBUG("Updated %s", this._userState) - logSeparator() + + DEBUG("Updated %s", this._userStateModel) } def onGetUserBillRequest(event: GetUserBillRequest): Unit = { + checkInitial() + try{ val timeslot = event.timeslot - val resourceTypes = aquarium.policyStore. + val resourceTypes: Map[String, ResourceType] = aquarium.policyStore. loadSortedPolicyModelsWithin(timeslot.from.getTime, timeslot.to.getTime). values.headOption match { case None => Map[String,ResourceType]() case Some(policy:PolicyModel) => policy.resourceTypesMap } - val state= if(haveUserState) Some(this._userState.msg) else None - val billEntry = AbstractBillEntry.fromWorkingUserState(timeslot,this._userID,state,resourceTypes) - val billData = GetUserBillResponseData(this._userID,billEntry) + val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None + val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes) + //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry) + //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString) + val billData = GetUserBillResponseData(this.userID,billEntryMsg) sender ! GetUserBillResponse(Right(billData)) } catch { case e:Exception => - e.printStackTrace() - sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500) + e.printStackTrace() + sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500) } } def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = { + checkInitial() + val userID = event.userID - (haveUserCreationIMEvent, haveUserState) match { + (haveAgreements, haveUserState) match { case (true, true) ⇒ // (User CREATEd, with balance state) val realtimeMillis = TimeHelpers.nowMillis() chargingService.calculateRealtimeUserState( - this._userState, - BillingMonthInfo.fromMillis(realtimeMillis), + this._userStateModel, + aquarium.currentResourceMapping, realtimeMillis ) - sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._userState.totalCredits))) + sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits))) case (true, false) ⇒ // (User CREATEd, no balance state) @@ -425,9 +335,9 @@ class UserActor extends ReflectiveRoleableActor { sender ! GetUserBalanceResponse( Right( GetUserBalanceResponseData( - this._userID, - aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis) - ))) + this.userID, + aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString() + ))) case (false, true) ⇒ // (Not CREATEd, with balance state) @@ -442,16 +352,18 @@ class UserActor extends ReflectiveRoleableActor { } def onGetUserStateRequest(event: GetUserStateRequest): Unit = { + checkInitial() + haveUserState match { case true ⇒ val realtimeMillis = TimeHelpers.nowMillis() chargingService.calculateRealtimeUserState( - this._userState, - BillingMonthInfo.fromMillis(realtimeMillis), + this._userStateModel, + aquarium.currentResourceMapping, realtimeMillis ) - sender ! GetUserStateResponse(Right(this._userState.msg)) + sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg)) case false ⇒ sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404) @@ -459,36 +371,38 @@ class UserActor extends ReflectiveRoleableActor { } def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = { + checkInitial() + haveUserState match { case true ⇒ DEBUG("haveWorkingUserState: %s", event) val realtimeMillis = TimeHelpers.nowMillis() chargingService.calculateRealtimeUserState( - this._userState, - BillingMonthInfo.fromMillis(realtimeMillis), + this._userStateModel, + aquarium.currentResourceMapping, realtimeMillis ) sender ! GetUserWalletResponse( Right( GetUserWalletResponseData( - this._userID, - this._userState.totalCredits, - MessageFactory.newWalletEntriesMsg(this._userState.msg.getWalletEntries) - ))) + this.userID, + this._userStateModel.totalCredits, + MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries) + ))) case false ⇒ DEBUG("!haveWorkingUserState: %s", event) - haveUserCreationIMEvent match { + haveAgreements match { case true ⇒ - DEBUG("haveUserCreationIMEvent: %s", event) + DEBUG("haveAgreements: %s", event) sender ! GetUserWalletResponse( Right( GetUserWalletResponseData( - this._userID, - aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis), + this.userID, + Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)), MessageFactory.newWalletEntriesMsg() - ))) + ))) case false ⇒ DEBUG("!haveUserCreationIMEvent: %s", event) @@ -497,8 +411,24 @@ class UserActor extends ReflectiveRoleableActor { } } + /** + * Initializes the actor's internal state. + * + * @param userID + */ + def onSetUserActorUserID(userID: String) { + // Create the full agreement history from the original sources (IMEvents) + this._userStateModel = ModelFactory.newInitialUserStateModel( + userID, + Real(0), + TimeHelpers.nowMillis() + ) + + require(this._userStateModel.isInitial, "this._userStateModel.isInitial") + } + private[this] def D_userID = { - this._userID + if(haveUserState) userID else "???" } private[this] def DEBUG(fmt: String, args: Any*) =