X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/734338792ce702c496c1a9898ae42d76d679778d..060b04bd9458fd5ec3c68c80671595e24a09858a:/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index 77ebfb9..d7604cc 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -37,18 +37,27 @@ package gr.grnet.aquarium.actor package service package user -import gr.grnet.aquarium.actor._ - -import gr.grnet.aquarium.util.shortClassNameOf -import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded} -import akka.config.Supervision.Temporary -import gr.grnet.aquarium.Configurator -import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc} -import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent} -import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest} -import gr.grnet.aquarium.computation.data.IMStateSnapshot -import gr.grnet.aquarium.computation.UserState -import gr.grnet.aquarium.event.model.im.IMEventModel +import gr.grnet.aquarium.{Real, AquariumInternalError} +import gr.grnet.aquarium.actor.message.GetUserBalanceRequest +import gr.grnet.aquarium.actor.message.GetUserBalanceResponse +import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData +import gr.grnet.aquarium.actor.message.GetUserBillRequest +import gr.grnet.aquarium.actor.message.GetUserBillResponse +import gr.grnet.aquarium.actor.message.GetUserBillResponseData +import gr.grnet.aquarium.actor.message.GetUserStateRequest +import gr.grnet.aquarium.actor.message.GetUserStateResponse +import gr.grnet.aquarium.actor.message.GetUserWalletRequest +import gr.grnet.aquarium.actor.message.GetUserWalletResponse +import gr.grnet.aquarium.actor.message.GetUserWalletResponseData +import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded +import gr.grnet.aquarium.charging.state.UserStateModel +import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg} +import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers} +import gr.grnet.aquarium.service.event.BalanceEvent +import gr.grnet.aquarium.util.date.TimeHelpers +import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf} +import gr.grnet.aquarium.policy.{ResourceType, PolicyModel} +import gr.grnet.aquarium.charging.bill.BillEntryMsg /** * @@ -56,119 +65,384 @@ import gr.grnet.aquarium.event.model.im.IMEventModel */ class UserActor extends ReflectiveRoleableActor { - private[this] var _imState: IMStateSnapshot = _ - private[this] var _userState: UserState = _ + private[this] var _imMsgCount = 0 + private[this] var _userStateModel: UserStateModel = _ - self.lifeCycle = Temporary - - private[this] def _userID = this._userState.userID - private[this] def _shutmedown(): Unit = { - if(_haveUserState) { - UserActorCache.invalidate(_userID) + def userID = { + if(!haveUserState) { + throw new AquariumInternalError("%s not initialized") } - self.stop() + this._userStateModel.userID + } + + override def postStop() { + DEBUG("I am finally stopped (in postStop())") + aquarium.akkaService.notifyUserActorPostStop(this) + } + + private[this] def shutmedown(): Unit = { + if(haveUserState) { + aquarium.akkaService.invalidateUserActor(this) + } } override protected def onThrowable(t: Throwable, message: AnyRef) = { - logChainOfCauses(t) + LogHelpers.logChainOfCauses(logger, t) ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage) - _shutmedown() + shutmedown() } def role = UserActorRole - private[this] def _configurator: Configurator = Configurator.MasterConfigurator + private[this] def chargingService = aquarium.chargingService - private[this] def _timestampTheshold = - _configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000) + def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { + } + private[this] def unsafeUserCreationIMEventMsg = { + this._userStateModel.unsafeUserCreationIMEvent + } - private[this] def _haveUserState = { - this._userState ne null + private[this] def haveAgreements = { + (this._userStateModel ne null) } - private[this] def _haveIMState = { - this._imState ne null + private[this] def haveUserCreationEvent = { + haveAgreements && + this._userStateModel.hasUserCreationEvent } - def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { + private[this] def haveUserState = { + (this._userStateModel ne null) } - def onActorProviderConfigured(event: ActorProviderConfigured): Unit = { + private[this] def isInitial = this._userStateModel.isInitial + + /** + * Creates the agreement history from all the stored IMEvents. + * + * @return (`true` iff there was a user CREATE event, the number of events processed) + */ + private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = { + DEBUG("createUserAgreementHistoryFromStoredIMEvents()") + assert(haveUserState, "haveUserState") + + + var _imcounter = 0 + + val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒ + _imcounter += 1 + DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent) + + if(_imcounter == 1 && !MessageHelpers.isUserCreationIMEvent(imEvent)) { + // The very first event must be a CREATE event. Otherwise we abort initialization. + // This will normally happen during devops :) + INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent)) + false + } + else { + val effectiveFromMillis = imEvent.getOccurredMillis + val role = imEvent.getRole + // calling unsafe just for the side-effect + assert( + aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis) ne null, + "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis) + ) + + this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent) + true + } + } + + this._imMsgCount = _imcounter + + DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg) + (hadCreateEvent, _imcounter) } - private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = { - // FIXME: Implement based on the role - "default" + private[this] def saveFirstUserState(userID: String) { + this._userStateModel.userStateMsg.setIsFirst(true) + this._userStateModel.updateUserStateMsg( + aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg) + ) } - def onProcessIMEvent(event: ProcessIMEvent): Unit = { - val now = TimeHelpers.nowMillis() + private[this] def saveSubsequentUserState() { + this._userStateModel.userStateMsg.setIsFirst(false) + this._userStateModel.updateUserStateMsg( + aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg) + ) + } - val imEvent = event.imEvent - val hadIMState = _haveIMState + private[this] def loadLastKnownUserStateAndUpdateAgreements() { + val userID = this._userStateModel.userID + aquarium.userStateStore.findLatestUserState(userID) match { + case None ⇒ + // First user state ever + saveFirstUserState(userID) - if(hadIMState) { - val newOccurredMillis = imEvent.occurredMillis - val currentOccurredMillis = this._imState.imEvent.occurredMillis + case Some(latestUserState) ⇒ + this._userStateModel.updateUserStateMsg(latestUserState) + } + } - if(newOccurredMillis < currentOccurredMillis) { - INFO( - "Ignoring older IMEvent: [%s] < [%s]", - new MutableDateCalc(newOccurredMillis).toYYYYMMDDHHMMSSSSS, - new MutableDateCalc(currentOccurredMillis).toYYYYMMDDHHMMSSSSS) + private[this] def processResourceEventsAfterLastKnownUserState() { + // Update the user state snapshot with fresh (ie not previously processed) events. + aquarium.resourceEventStore.foreachResourceEventOccurredInPeriod( + this._userStateModel.userID, + this._userStateModel.latestResourceEventOccurredMillis, + TimeHelpers.nowMillis() + ) + } - return - } + private[this] def makeUserStateMsgUpToDate() { + loadLastKnownUserStateAndUpdateAgreements() + processResourceEventsAfterLastKnownUserState() + } + + private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = { + if(!isInitial) { + return false } - this._imState = IMStateSnapshot(imEvent) - DEBUG("%s %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState)) + val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID) + + if(userCreated) { + makeUserStateMsgUpToDate() + } + + nextThing() + + true + } + + /** + * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the + * messaging hub (rabbitmq). + */ + def onIMEventMsg(imEvent: IMEventMsg) { + if(checkInitial()) { + return + } + + // Check for out of sync (regarding IMEvents) + val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis + if(isOutOfSyncIM) { + // clear all resource state + // FIXME implement + + return + } + + // Check out of sync (regarding ResourceEvents) + val isOutOfSyncRC = false // FIXME implement + if(isOutOfSyncRC) { + // TODO + + return + } + + // Make new agreement + this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent) + this._imMsgCount += 1 + + if(MessageHelpers.isUserCreationIMEvent(imEvent)) { + makeUserStateMsgUpToDate() + } + + DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg) + } + + def onResourceEventMsg(rcEvent: ResourceEventMsg) { + if(checkInitial()) { + return + } + + if(!haveUserCreationEvent) { + DEBUG("No agreements. Ignoring %s", rcEvent) + + return + } + + assert(haveUserState, "haveUserState") + + val oldTotalCredits = this._userStateModel.totalCreditsAsReal + + chargingService.processResourceEvent( + rcEvent.getReceivedMillis, + rcEvent, + this._userStateModel, + aquarium.currentResourceMapping, + true + ) + + val newTotalCredits = this._userStateModel.totalCreditsAsReal + + if(oldTotalCredits.signum * newTotalCredits.signum < 0) { + aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0) + } + + DEBUG("Updated %s", this._userStateModel) + } + + def onGetUserBillRequest(event: GetUserBillRequest): Unit = { + checkInitial() + + try{ + val timeslot = event.timeslot + val resourceTypes: Map[String, ResourceType] = aquarium.policyStore. + loadSortedPolicyModelsWithin(timeslot.from.getTime, + timeslot.to.getTime). + values.headOption match { + case None => Map[String,ResourceType]() + case Some(policy:PolicyModel) => policy.resourceTypesMap + } + val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None + val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes) + //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry) + //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString) + val billData = GetUserBillResponseData(this.userID,billEntryMsg) + sender ! GetUserBillResponse(Right(billData)) + } catch { + case e:Exception => + e.printStackTrace() + sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500) + } } def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = { - val userId = event.userID - // FIXME: Implement - self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount)) + checkInitial() + + val userID = event.userID + + (haveAgreements, haveUserState) match { + case (true, true) ⇒ + // (User CREATEd, with balance state) + val realtimeMillis = TimeHelpers.nowMillis() + chargingService.calculateRealtimeUserState( + this._userStateModel, + aquarium.currentResourceMapping, + realtimeMillis + ) + + sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits))) + + case (true, false) ⇒ + // (User CREATEd, no balance state) + // Return the default initial balance + sender ! GetUserBalanceResponse( + Right( + GetUserBalanceResponseData( + this.userID, + aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString() + ))) + + case (false, true) ⇒ + // (Not CREATEd, with balance state) + // Clearly this is internal error + sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500) + + case (false, false) ⇒ + // (Not CREATEd, no balance state) + // The user is completely unknown + sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/) + } } def onGetUserStateRequest(event: GetUserStateRequest): Unit = { - val userId = event.userID - // FIXME: Implement - self reply GetUserStateResponse(userId, Right(this._userState)) - } + checkInitial() + + haveUserState match { + case true ⇒ + val realtimeMillis = TimeHelpers.nowMillis() + chargingService.calculateRealtimeUserState( + this._userStateModel, + aquarium.currentResourceMapping, + realtimeMillis + ) + + sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg)) - def onProcessResourceEvent(event: ProcessResourceEvent): Unit = { - val rcEvent = event.rcEvent + case false ⇒ + sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404) + } + } - logger.info("Got\n{}", rcEvent.toJsonString) + def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = { + checkInitial() + + haveUserState match { + case true ⇒ + DEBUG("haveWorkingUserState: %s", event) + val realtimeMillis = TimeHelpers.nowMillis() + chargingService.calculateRealtimeUserState( + this._userStateModel, + aquarium.currentResourceMapping, + realtimeMillis + ) + + sender ! GetUserWalletResponse( + Right( + GetUserWalletResponseData( + this.userID, + this._userStateModel.totalCredits, + MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries) + ))) + + case false ⇒ + DEBUG("!haveWorkingUserState: %s", event) + haveAgreements match { + case true ⇒ + DEBUG("haveAgreements: %s", event) + sender ! GetUserWalletResponse( + Right( + GetUserWalletResponseData( + this.userID, + Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)), + MessageFactory.newWalletEntriesMsg() + ))) + + case false ⇒ + DEBUG("!haveUserCreationIMEvent: %s", event) + sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404) + } + } } + /** + * Initializes the actor's internal state. + * + * @param userID + */ + def onSetUserActorUserID(userID: String) { + // Create the full agreement history from the original sources (IMEvents) + this._userStateModel = ModelFactory.newInitialUserStateModel( + userID, + Real(0), + TimeHelpers.nowMillis() + ) + + require(this._userStateModel.isInitial, "this._userStateModel.isInitial") + } private[this] def D_userID = { - if(this._userState eq null) - if(this._imState eq null) - "" - else - this._imState.imEvent.userID - else - this._userState.userID + if(haveUserState) userID else "???" } private[this] def DEBUG(fmt: String, args: Any*) = - logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def INFO(fmt: String, args: Any*) = - logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def WARN(fmt: String, args: Any*) = - logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def ERROR(fmt: String, args: Any*) = - logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def ERROR(t: Throwable, fmt: String, args: Any*) = - logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t) + logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t) }