X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/c11b8ebcbf64910d7c6725b3cdc8b733363c2eda..684875dcd6fe25463f1c5c8c2ca637caf01d400d:/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index 12a2c08..b31ea37 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -39,15 +39,39 @@ package user import gr.grnet.aquarium.actor._ -import akka.config.Supervision.Temporary -import gr.grnet.aquarium.Aquarium -import gr.grnet.aquarium.util.{shortClassNameOf, shortNameOfClass} import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent} -import gr.grnet.aquarium.computation.data.IMStateSnapshot +import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded} +import gr.grnet.aquarium.util.date.TimeHelpers +import gr.grnet.aquarium.service.event.BalanceEvent import gr.grnet.aquarium.event.model.im.IMEventModel -import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded} -import gr.grnet.aquarium.computation.NewUserState -import gr.grnet.aquarium.actor.message.{GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest} +import message._ +import config.AquariumPropertiesLoaded +import config.InitializeUserActorState +import event.ProcessIMEvent +import event.ProcessResourceEvent +import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf} +import gr.grnet.aquarium.{Aquarium, AquariumInternalError} +import gr.grnet.aquarium.computation.BillingMonthInfo +import gr.grnet.aquarium.charging.state.UserStateBootstrap +import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel} +import gr.grnet.aquarium.charging.reason.{InitialUserActorSetup, RealtimeChargingReason} +import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement} +import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel} +import message.GetUserBalanceRequest +import message.GetUserBalanceResponse +import message.GetUserBalanceResponseData +import message.GetUserStateRequest +import message.GetUserStateResponse +import message.GetUserWalletRequest +import message.GetUserWalletResponse +import message.GetUserWalletResponseData +import scala.Left +import gr.grnet.aquarium.charging.state.WorkingAgreementHistory +import scala.Some +import scala.Right +import gr.grnet.aquarium.policy.StdUserAgreement +import gr.grnet.aquarium.charging.state.UserStateBootstrap +import gr.grnet.aquarium.charging.bill.BillEntry /** * @@ -56,85 +80,196 @@ import gr.grnet.aquarium.actor.message.{GetUserBalanceResponse, GetUserStateRequ class UserActor extends ReflectiveRoleableActor { private[this] var _userID: String = "" - private[this] var _imState: IMStateSnapshot = _ -// private[this] var _userState: UserState = _ - private[this] var _newUserState: NewUserState = _ + private[this] var _workingUserState: WorkingUserState = _ + private[this] var _userCreationIMEvent: IMEventModel = _ + private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory + private[this] var _latestIMEventID: String = "" + private[this] var _latestResourceEventID: String = "" + private[this] var _userStateBootstrap: UserStateBootstrap = _ + + def unsafeUserID = { + if(!haveUserID) { + throw new AquariumInternalError("%s not initialized") + } - self.lifeCycle = Temporary + this._userID + } - private[this] def _shutmedown(): Unit = { - if(_haveUserState) { - UserActorCache.invalidate(_userID) - } + override def postStop() { + DEBUG("I am finally stopped (in postStop())") + aquarium.akkaService.notifyUserActorPostStop(this) + } - self.stop() + private[this] def shutmedown(): Unit = { + if(haveUserID) { + aquarium.akkaService.invalidateUserActor(this) + } } override protected def onThrowable(t: Throwable, message: AnyRef) = { - logChainOfCauses(t) + LogHelpers.logChainOfCauses(logger, t) ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage) - _shutmedown() + shutmedown() } def role = UserActorRole - private[this] def aquarium: Aquarium = Aquarium.Instance - - private[this] def _timestampTheshold = - aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000) + private[this] def chargingService = aquarium.chargingService + private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ { + aquarium.userStateStore.insertUserState(userState) + } - private[this] def _haveUserState = { - this._newUserState ne null + @inline private[this] def haveUserID = { + this._userID ne null } - private[this] def _haveIMState = { - this._imState ne null + @inline private[this] def haveUserCreationIMEvent = { + this._userCreationIMEvent ne null } def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { } - def onActorProviderConfigured(event: ActorProviderConfigured): Unit = { + @inline private[this] def haveAgreements = { + this._workingAgreementHistory.size > 0 } - private[this] def createIMState(userID: String): Unit = { - val store = aquarium.imEventStore - // TODO: Optimization: Since IMState only records roles, we should incrementally - // TODO: built it only for those IMEvents that changed the role. - store.replayIMEventsInOccurrenceOrder(userID) { imEvent ⇒ - logger.debug("Replaying %s".format(imEvent)) + @inline private[this] def haveWorkingUserState = { + this._workingUserState ne null + } - val newState = this._imState match { - case null ⇒ - IMStateSnapshot.initial(imEvent) + @inline private[this] def haveUserStateBootstrap = { + this._userStateBootstrap ne null + } - case currentState ⇒ - currentState.copyWithEvent(imEvent) + private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = { + if(imEvent.isCreateUser) { + if(haveUserCreationIMEvent) { + throw new AquariumInternalError( + "Got user creation event (id=%s) but I already have one (id=%s)", + this._userCreationIMEvent.id, + imEvent.id + ) } - this._imState = newState + this._userCreationIMEvent = imEvent } - DEBUG("Recomputed %s = %s", shortNameOfClass(classOf[IMStateSnapshot]), this._imState) + val effectiveFromMillis = imEvent.occurredMillis + val role = imEvent.role + // calling unsafe just for the side-effect + assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis)) + + val newAgreement = StdUserAgreement( + imEvent.id, + Some(imEvent.id), + effectiveFromMillis, + Long.MaxValue, + role, + PolicyDefinedFullPriceTableRef + ) + + this._workingAgreementHistory += newAgreement } - private[this] def createUserState(userID: String): Unit = { + private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = { + this._latestIMEventID = imEvent.id } - def onInitializeUserState(event: InitializeUserState): Unit = { - val userID = event.userID - this._userID = userID - DEBUG("Got %s", event) + private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventModel): Unit = { + this._latestResourceEventID = rcEvent.id + } + + /** + * Creates the initial state that is related to IMEvents. + */ + private[this] def initializeStateOfIMEvents(): Unit = { + // NOTE: this._userID is already set up by onInitializeUserActorState() + aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒ + DEBUG("Replaying %s", imEvent) + + updateAgreementHistoryFrom(imEvent) + updateLatestIMEventIDFrom(imEvent) + } + + if(haveAgreements) { + DEBUG("Initial agreement history %s", this._workingAgreementHistory.toJsonString) + logSeparator() + } + } + + /** + * Resource events are processed only if the user has been created and has agreements. + * Otherwise nothing can be computed. + */ + private[this] def shouldProcessResourceEvents: Boolean = { + haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap + } + + private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = { + assert(this.haveAgreements, "this.haveAgreements") + assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent") + + val userCreationMillis = this._userCreationIMEvent.occurredMillis + val userCreationRole = this._userCreationIMEvent.role // initial role + val userCreationIMEventID = this._userCreationIMEvent.id + + if(!haveUserStateBootstrap) { + this._userStateBootstrap = UserStateBootstrap( + this._userID, + userCreationMillis, + aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)), + aquarium.initialUserBalance(userCreationRole, userCreationMillis) + ) + } + + val now = TimeHelpers.nowMillis() + this._workingUserState = chargingService.replayMonthChargingUpTo( + BillingMonthInfo.fromMillis(now), + now, + this._userStateBootstrap, + aquarium.currentResourceTypesMap, + InitialUserActorSetup(), + aquarium.userStateStore.insertUserState + ) + + // Final touch: Update agreement history in the working user state. + // The assumption is that all agreement changes go via IMEvents, so the + // state this._workingAgreementHistory is always the authoritative source. + if(haveWorkingUserState) { + this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory) + DEBUG("Computed working user state %s", this._workingUserState.toJsonString) + } + } + + private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = { + if(!this.haveAgreements) { + DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event) + return + } + + if(!this.haveUserCreationIMEvent) { + DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event) + return + } + + // We will also need this functionality when receiving IMEvents, so we place it in a method + loadWorkingUserStateAndUpdateAgreementHistory() - createIMState(userID) - createUserState(userID) + if(haveWorkingUserState) { + DEBUG("Initial working user state %s", this._workingUserState.toJsonString) + logSeparator() + } } - private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = { - // FIXME: Implement based on the role - "default" + def onInitializeUserActorState(event: InitializeUserActorState): Unit = { + this._userID = event.userID + DEBUG("Got %s", event) + + initializeStateOfIMEvents() + initializeStateOfResourceEvents(event) } /** @@ -144,46 +279,246 @@ class UserActor extends ReflectiveRoleableActor { */ def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = { val imEvent = processEvent.imEvent + val hadUserCreationIMEvent = haveUserCreationIMEvent - if(!_haveIMState) { - // This is an error. Should have been initialized from somewhere ... - throw new Exception("Got %s while being uninitialized".format(processEvent)) + if(!haveAgreements) { + // This IMEvent has arrived after any ResourceEvents + INFO("Arrived after any ResourceEvent: %s", imEvent.toDebugString) + initializeStateOfIMEvents() + } + else { + if(this._latestIMEventID == imEvent.id) { + // This happens when the actor is brought to life, then immediately initialized, and then + // sent the first IM event. But from the initialization procedure, this IM event will have + // already been loaded from DB! + INFO("Ignoring first %s", imEvent.toDebugString) + logSeparator() + + //this._latestIMEventID = imEvent.id + return + } + if(imEvent.isAddCredits) { + if(!hadUserCreationIMEvent && haveUserCreationIMEvent) + loadWorkingUserStateAndUpdateAgreementHistory() + onHandleAddCreditsEvent(imEvent) + + } else { + updateAgreementHistoryFrom(imEvent) + updateLatestIMEventIDFrom(imEvent) + } } - if(this._imState.latestIMEvent.id == imEvent.id) { - // This happens when the actor is brought to life, then immediately initialized, and then - // sent the first IM event. But from the initialization procedure, this IM event will have - // already been loaded from DB! - DEBUG("Ignoring first %s after birth", imEvent.toDebugString) - return + // Must also update user state if we know when in history the life of a user begins + if(!hadUserCreationIMEvent && haveUserCreationIMEvent) { + INFO("Processing user state, since we had a CREATE IMEvent") + loadWorkingUserStateAndUpdateAgreementHistory() } - this._imState = this._imState.copyWithEvent(imEvent) + logSeparator() + } - DEBUG("Update %s = %s", shortClassNameOf(this._imState), this._imState) + /* Convert astakos message for adding credits + to a regular RESOURCE message */ + def onHandleAddCreditsEvent(imEvent : IMEventModel) = { + val credits = imEvent.details(IMEventModel.DetailsNames.credits).toInt.toDouble + val event = new StdResourceEvent( + imEvent.id, + imEvent.occurredMillis, + imEvent.receivedMillis, + imEvent.userID, + imEvent.clientID, + imEvent.eventType, + imEvent.eventType, + credits, + imEvent.eventVersion, + imEvent.details + ) + //Console.err.println("Event: " + event) + //Console.err.println("Total credits before: " + _workingUserState.totalCredits) + onProcessResourceEvent(new ProcessResourceEvent(event)) + //Console.err.println("Total credits after: " + _workingUserState.totalCredits) + //Console.err.println("OK.") } def onProcessResourceEvent(event: ProcessResourceEvent): Unit = { val rcEvent = event.rcEvent - if(!_haveIMState) { - // This means the user has not been activated. So, we do not process any resource event - INFO("Not processing %s", rcEvent.toJsonString) + if(!shouldProcessResourceEvents) { + // This means the user has not been created (at least, as far as Aquarium is concerned). + // So, we do not process any resource event + DEBUG("Not processing %s", rcEvent.toJsonString) + logSeparator() + + return + } + + // Since the latest resource event per resource is recorded in the user state, + // we do not need to query the store. Just query the in-memory state. + // Note: This is a similar situation with the first IMEvent received right after the user + // actor is created. + if(this._latestResourceEventID == rcEvent.id) { + INFO("Ignoring first %s", rcEvent.toDebugString) + logSeparator() + return } + + val now = TimeHelpers.nowMillis() + val currentResourcesMap = aquarium.currentResourceTypesMap + val chargingReason = RealtimeChargingReason(None, now) + + val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now) + val nowYear = nowBillingMonthInfo.year + val nowMonth = nowBillingMonthInfo.month + + val eventOccurredMillis = rcEvent.occurredMillis + val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis) + val eventYear = eventBillingMonthInfo.year + val eventMonth = eventBillingMonthInfo.month + + def computeBatch(): Unit = { + DEBUG("Going for out of sync charging") + this._workingUserState = chargingService.replayMonthChargingUpTo( + nowBillingMonthInfo, + // Take into account that the event may be out-of-sync. + // TODO: Should we use this._latestResourceEventOccurredMillis instead of now? + now max eventOccurredMillis, + this._userStateBootstrap, + currentResourcesMap, + chargingReason, + stdUserStateStoreFunc + ) + + updateLatestResourceEventIDFrom(rcEvent) + } + + def computeRealtime(): Unit = { + DEBUG("Going for in sync charging") + chargingService.processResourceEvent( + rcEvent, + this._workingUserState, + chargingReason, + nowBillingMonthInfo, + true + ) + + updateLatestResourceEventIDFrom(rcEvent) + } + + var oldTotalCredits = this._workingUserState.totalCredits + // FIXME check these + if(nowYear != eventYear || nowMonth != eventMonth) { + DEBUG( + "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)", + nowYear, eventYear, + nowMonth, eventMonth + ) + computeBatch() + } + else if(this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis) { + DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis") + DEBUG( + "%s < %s", + TimeHelpers.toYYYYMMDDHHMMSSSSS(this._workingUserState.latestResourceEventOccurredMillis), + TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.occurredMillis) + ) + computeRealtime() + } + else { + computeBatch() + } + var newTotal = this._workingUserState.totalCredits + if(oldTotalCredits * newTotal < 0) + aquarium.eventBus ! new BalanceEvent(this._workingUserState.userID, + newTotal>=0) + DEBUG("Updated %s", this._workingUserState) + logSeparator() } + def onGetUserBillRequest(event: GetUserBillRequest): Unit = { + try{ + val timeslot = event.timeslot + val state= if(haveWorkingUserState) Some(this._workingUserState) else None + val billEntry = BillEntry.fromWorkingUserState(timeslot,this._userID,state) + val billData = GetUserBillResponseData(this._userID,billEntry) + sender ! GetUserBillResponse(Right(billData)) + } catch { + case e:Exception => + e.printStackTrace() + sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500) + } + } def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = { - val userId = event.userID - // FIXME: Implement -// self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount)) + val userID = event.userID + + (haveUserCreationIMEvent, haveWorkingUserState) match { + case (true, true) ⇒ + // (User CREATEd, with balance state) + sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits))) + + case (true, false) ⇒ + // (User CREATEd, no balance state) + // Return the default initial balance + sender ! GetUserBalanceResponse( + Right( + GetUserBalanceResponseData( + this._userID, + aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis) + ))) + + case (false, true) ⇒ + // (Not CREATEd, with balance state) + // Clearly this is internal error + sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500) + + case (false, false) ⇒ + // (Not CREATEd, no balance state) + // The user is completely unknown + sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/) + } } def onGetUserStateRequest(event: GetUserStateRequest): Unit = { - val userId = event.userID - // FIXME: Implement -// self reply GetUserStateResponse(userId, Right(this._userState)) + haveWorkingUserState match { + case true ⇒ + sender ! GetUserStateResponse(Right(this._workingUserState)) + + case false ⇒ + sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404) + } + } + + def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = { + haveWorkingUserState match { + case true ⇒ + DEBUG("haveWorkingUserState: %s", event) + sender ! GetUserWalletResponse( + Right( + GetUserWalletResponseData( + this._userID, + this._workingUserState.totalCredits, + this._workingUserState.walletEntries.toList + ))) + + case false ⇒ + DEBUG("!haveWorkingUserState: %s", event) + haveUserCreationIMEvent match { + case true ⇒ + DEBUG("haveUserCreationIMEvent: %s", event) + sender ! GetUserWalletResponse( + Right( + GetUserWalletResponseData( + this._userID, + aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis), + Nil + ))) + + case false ⇒ + DEBUG("!haveUserCreationIMEvent: %s", event) + sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404) + } + } } private[this] def D_userID = { @@ -191,17 +526,17 @@ class UserActor extends ReflectiveRoleableActor { } private[this] def DEBUG(fmt: String, args: Any*) = - logger.debug("User[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def INFO(fmt: String, args: Any*) = - logger.info("User[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def WARN(fmt: String, args: Any*) = - logger.warn("User[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def ERROR(fmt: String, args: Any*) = - logger.error("User[%s]: %s".format(D_userID, fmt.format(args: _*))) + logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*))) private[this] def ERROR(t: Throwable, fmt: String, args: Any*) = - logger.error("User[%s]: %s".format(D_userID, fmt.format(args: _*)), t) + logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t) }