X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/0367eb0362af0e8a388c2ebd2e58145697875764..060b04bd9458fd5ec3c68c80671595e24a09858a:/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index fd502af..d7604cc 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -37,146 +37,412 @@ package gr.grnet.aquarium.actor package service package user - -import gr.grnet.aquarium.actor._ -import gr.grnet.aquarium.user._ - -import gr.grnet.aquarium.util.shortClassNameOf +import gr.grnet.aquarium.{Real, AquariumInternalError} +import gr.grnet.aquarium.actor.message.GetUserBalanceRequest +import gr.grnet.aquarium.actor.message.GetUserBalanceResponse +import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData +import gr.grnet.aquarium.actor.message.GetUserBillRequest +import gr.grnet.aquarium.actor.message.GetUserBillResponse +import gr.grnet.aquarium.actor.message.GetUserBillResponseData +import gr.grnet.aquarium.actor.message.GetUserStateRequest +import gr.grnet.aquarium.actor.message.GetUserStateResponse +import gr.grnet.aquarium.actor.message.GetUserWalletRequest +import gr.grnet.aquarium.actor.message.GetUserWalletResponse +import gr.grnet.aquarium.actor.message.GetUserWalletResponseData +import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded +import gr.grnet.aquarium.charging.state.UserStateModel +import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg} +import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers} +import gr.grnet.aquarium.service.event.BalanceEvent import gr.grnet.aquarium.util.date.TimeHelpers -import gr.grnet.aquarium.actor.message.service.router._ -import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded} -import gr.grnet.aquarium.event.im.IMEventModel -import akka.config.Supervision.Temporary -import akka.actor.PoisonPill -import gr.grnet.aquarium.{AquariumException, Configurator} - +import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf} +import gr.grnet.aquarium.policy.{ResourceType, PolicyModel} +import gr.grnet.aquarium.charging.bill.BillEntryMsg /** * * @author Christos KK Loverdos */ -class UserActor extends ReflectiveAquariumActor { - private[this] var _userID: String = _ - private[this] var _userState: UserState = _ +class UserActor extends ReflectiveRoleableActor { + private[this] var _imMsgCount = 0 + private[this] var _userStateModel: UserStateModel = _ - self.lifeCycle = Temporary + def userID = { + if(!haveUserState) { + throw new AquariumInternalError("%s not initialized") + } - override protected def onThrowable(t: Throwable) = { - logger.error("Terminating due to: %s".format(t.getMessage), t) - UserActorCache.invalidate(this._userID) + this._userStateModel.userID + } - self ! PoisonPill + override def postStop() { + DEBUG("I am finally stopped (in postStop())") + aquarium.akkaService.notifyUserActorPostStop(this) } - def role = UserActorRole + private[this] def shutmedown(): Unit = { + if(haveUserState) { + aquarium.akkaService.invalidateUserActor(this) + } + } + + override protected def onThrowable(t: Throwable, message: AnyRef) = { + LogHelpers.logChainOfCauses(logger, t) + ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage) - private[this] def _configurator: Configurator = Configurator.MasterConfigurator -// private[this] def _userId = _userState.userId + shutmedown() + } - private[this] def _timestampTheshold = - _configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000) + def role = UserActorRole + private[this] def chargingService = aquarium.chargingService def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { } - def onActorProviderConfigured(event: ActorProviderConfigured): Unit = { + private[this] def unsafeUserCreationIMEventMsg = { + this._userStateModel.unsafeUserCreationIMEvent } - private[this] def _computeAgreementForNewUser(imEvent: IMEventModel): String = { - // FIXME: Implement based on the role - "default" + private[this] def haveAgreements = { + (this._userStateModel ne null) } - private[this] def processCreateUser(imEvent: IMEventModel): Unit = { - val userID = imEvent.userID - this._userID = userID + private[this] def haveUserCreationEvent = { + haveAgreements && + this._userStateModel.hasUserCreationEvent + } - val store = _configurator.storeProvider.userStateStore - // try find user state. normally should ot exist - val latestUserStateOpt = store.findLatestUserStateByUserID(userID) - if(latestUserStateOpt.isDefined) { - logger.error("Got %s(%s, %s) but user already exists. Ingoring".format( - userID, - shortClassNameOf(imEvent), - imEvent.eventType)) + private[this] def haveUserState = { + (this._userStateModel ne null) + } - return + private[this] def isInitial = this._userStateModel.isInitial + + /** + * Creates the agreement history from all the stored IMEvents. + * + * @return (`true` iff there was a user CREATE event, the number of events processed) + */ + private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = { + DEBUG("createUserAgreementHistoryFromStoredIMEvents()") + assert(haveUserState, "haveUserState") + + + var _imcounter = 0 + + val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒ + _imcounter += 1 + DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent) + + if(_imcounter == 1 && !MessageHelpers.isUserCreationIMEvent(imEvent)) { + // The very first event must be a CREATE event. Otherwise we abort initialization. + // This will normally happen during devops :) + INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent)) + false + } + else { + val effectiveFromMillis = imEvent.getOccurredMillis + val role = imEvent.getRole + // calling unsafe just for the side-effect + assert( + aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis) ne null, + "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis) + ) + + this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent) + true + } } - val initialAgreementName = _computeAgreementForNewUser(imEvent) - val newUserState = DefaultUserStateComputations.createInitialUserState( - userID, - imEvent.occurredMillis, - imEvent.isActive, - 0.0, - List(imEvent.role), - initialAgreementName) + this._imMsgCount = _imcounter + + DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg) + (hadCreateEvent, _imcounter) + } - this._userState = newUserState + private[this] def saveFirstUserState(userID: String) { + this._userStateModel.userStateMsg.setIsFirst(true) + this._userStateModel.updateUserStateMsg( + aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg) + ) + } - // FIXME: If this fails, then the actor must be shut down. - store.insertUserState(newUserState) + private[this] def saveSubsequentUserState() { + this._userStateModel.userStateMsg.setIsFirst(false) + this._userStateModel.updateUserStateMsg( + aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg) + ) } - private[this] def processModifyUser(event: IMEventModel): Unit = { - val now = TimeHelpers.nowMillis() - val newActive = ActiveStateSnapshot(event.isStateActive, now) + private[this] def loadLastKnownUserStateAndUpdateAgreements() { + val userID = this._userStateModel.userID + aquarium.userStateStore.findLatestUserState(userID) match { + case None ⇒ + // First user state ever + saveFirstUserState(userID) - DEBUG("New active status = %s".format(newActive)) + case Some(latestUserState) ⇒ + this._userStateModel.updateUserStateMsg(latestUserState) + } + } - this._userState = this._userState.copy(activeStateSnapshot = newActive) + private[this] def processResourceEventsAfterLastKnownUserState() { + // Update the user state snapshot with fresh (ie not previously processed) events. + aquarium.resourceEventStore.foreachResourceEventOccurredInPeriod( + this._userStateModel.userID, + this._userStateModel.latestResourceEventOccurredMillis, + TimeHelpers.nowMillis() + ) } - def onProcessIMEvent(event: ProcessIMEvent): Unit = { - val imEvent = event.imEvent - if(imEvent.isCreateUser) { - processCreateUser(imEvent) - } else if(imEvent.isModifyUser) { - processModifyUser(imEvent) - } else { - throw new AquariumException("Cannot interpret %s".format(imEvent)) + private[this] def makeUserStateMsgUpToDate() { + loadLastKnownUserStateAndUpdateAgreements() + processResourceEventsAfterLastKnownUserState() + } + + private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = { + if(!isInitial) { + return false } + + val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID) + + if(userCreated) { + makeUserStateMsgUpToDate() + } + + nextThing() + + true } - def onRequestUserBalance(event: RequestUserBalance): Unit = { - val userId = event.userID - // FIXME: Implement threshold - self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount) + /** + * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the + * messaging hub (rabbitmq). + */ + def onIMEventMsg(imEvent: IMEventMsg) { + if(checkInitial()) { + return + } + + // Check for out of sync (regarding IMEvents) + val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis + if(isOutOfSyncIM) { + // clear all resource state + // FIXME implement + + return + } + + // Check out of sync (regarding ResourceEvents) + val isOutOfSyncRC = false // FIXME implement + if(isOutOfSyncRC) { + // TODO + + return + } + + // Make new agreement + this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent) + this._imMsgCount += 1 + + if(MessageHelpers.isUserCreationIMEvent(imEvent)) { + makeUserStateMsgUpToDate() + } + + DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg) } - def onUserRequestGetState(event: UserRequestGetState): Unit = { - val userId = event.userID - // FIXME: implement - self reply UserResponseGetState(userId, this._userState) + def onResourceEventMsg(rcEvent: ResourceEventMsg) { + if(checkInitial()) { + return + } + + if(!haveUserCreationEvent) { + DEBUG("No agreements. Ignoring %s", rcEvent) + + return + } + + assert(haveUserState, "haveUserState") + + val oldTotalCredits = this._userStateModel.totalCreditsAsReal + + chargingService.processResourceEvent( + rcEvent.getReceivedMillis, + rcEvent, + this._userStateModel, + aquarium.currentResourceMapping, + true + ) + + val newTotalCredits = this._userStateModel.totalCreditsAsReal + + if(oldTotalCredits.signum * newTotalCredits.signum < 0) { + aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0) + } + + DEBUG("Updated %s", this._userStateModel) } - def onProcessResourceEvent(event: ProcessResourceEvent): Unit = { + def onGetUserBillRequest(event: GetUserBillRequest): Unit = { + checkInitial() + + try{ + val timeslot = event.timeslot + val resourceTypes: Map[String, ResourceType] = aquarium.policyStore. + loadSortedPolicyModelsWithin(timeslot.from.getTime, + timeslot.to.getTime). + values.headOption match { + case None => Map[String,ResourceType]() + case Some(policy:PolicyModel) => policy.resourceTypesMap + } + val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None + val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes) + //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry) + //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString) + val billData = GetUserBillResponseData(this.userID,billEntryMsg) + sender ! GetUserBillResponse(Right(billData)) + } catch { + case e:Exception => + e.printStackTrace() + sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500) + } } + def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = { + checkInitial() + + val userID = event.userID + + (haveAgreements, haveUserState) match { + case (true, true) ⇒ + // (User CREATEd, with balance state) + val realtimeMillis = TimeHelpers.nowMillis() + chargingService.calculateRealtimeUserState( + this._userStateModel, + aquarium.currentResourceMapping, + realtimeMillis + ) + + sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits))) + + case (true, false) ⇒ + // (User CREATEd, no balance state) + // Return the default initial balance + sender ! GetUserBalanceResponse( + Right( + GetUserBalanceResponseData( + this.userID, + aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString() + ))) + + case (false, true) ⇒ + // (Not CREATEd, with balance state) + // Clearly this is internal error + sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500) + + case (false, false) ⇒ + // (Not CREATEd, no balance state) + // The user is completely unknown + sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/) + } + } - private[this] def D_userID = { - this._userState match { - case null ⇒ - "???" + def onGetUserStateRequest(event: GetUserStateRequest): Unit = { + checkInitial() + + haveUserState match { + case true ⇒ + val realtimeMillis = TimeHelpers.nowMillis() + chargingService.calculateRealtimeUserState( + this._userStateModel, + aquarium.currentResourceMapping, + realtimeMillis + ) + + sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg)) + + case false ⇒ + sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404) + } + } - case userState ⇒ - userState.userID + def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = { + checkInitial() + + haveUserState match { + case true ⇒ + DEBUG("haveWorkingUserState: %s", event) + val realtimeMillis = TimeHelpers.nowMillis() + chargingService.calculateRealtimeUserState( + this._userStateModel, + aquarium.currentResourceMapping, + realtimeMillis + ) + + sender ! GetUserWalletResponse( + Right( + GetUserWalletResponseData( + this.userID, + this._userStateModel.totalCredits, + MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries) + ))) + + case false ⇒ + DEBUG("!haveWorkingUserState: %s", event) + haveAgreements match { + case true ⇒ + DEBUG("haveAgreements: %s", event) + sender ! GetUserWalletResponse( + Right( + GetUserWalletResponseData( + this.userID, + Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)), + MessageFactory.newWalletEntriesMsg() + ))) + + case false ⇒ + DEBUG("!haveUserCreationIMEvent: %s", event) + sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404) + } } } + + /** + * Initializes the actor's internal state. + * + * @param userID + */ + def onSetUserActorUserID(userID: String) { + // Create the full agreement history from the original sources (IMEvents) + this._userStateModel = ModelFactory.newInitialUserStateModel( + userID, + Real(0), + TimeHelpers.nowMillis() + ) + + require(this._userStateModel.isInitial, "this._userStateModel.isInitial") + } + + private[this] def D_userID = { + if(haveUserState) userID else "???" + } + private[this] def DEBUG(fmt: String, args: Any*) = - logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def INFO(fmt: String, args: Any*) = - logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def WARN(fmt: String, args: Any*) = - logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def ERROR(fmt: String, args: Any*) = - logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def ERROR(t: Throwable, fmt: String, args: Any*) = - logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t) + logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t) }