X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/e8748a1390b83e46b50f060a39db8b805c6d7d3b..0a02ec0cb6ae36b1e358c949ba85acd113f1d646:/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index 0ec0667..0065775 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -37,227 +37,133 @@ package gr.grnet.aquarium.actor package service package user -import java.util.Date -import com.ckkloverdos.maybe.{Failed, NoVal, Just} - import gr.grnet.aquarium.actor._ -import gr.grnet.aquarium.Configurator -import gr.grnet.aquarium.user._ - -import gr.grnet.aquarium.util.Loggable -import gr.grnet.aquarium.util.date.TimeHelpers -import gr.grnet.aquarium.logic.accounting.RoleAgreements -import gr.grnet.aquarium.messaging.AkkaAMQP -import gr.grnet.aquarium.actor.message.config.user.UserActorInitWithUserId -import gr.grnet.aquarium.actor.message.service.dispatcher._ -import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded} -import gr.grnet.aquarium.events.{WalletEntry, IMEvent} +import gr.grnet.aquarium.util.shortClassNameOf +import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded} +import akka.config.Supervision.Temporary +import gr.grnet.aquarium.Aquarium +import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc} +import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent} +import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest} +import gr.grnet.aquarium.computation.data.IMStateSnapshot +import gr.grnet.aquarium.computation.UserState +import gr.grnet.aquarium.event.model.im.IMEventModel /** * * @author Christos KK Loverdos */ -class UserActor extends AquariumActor -with AkkaAMQP -with ReflectiveAquariumActor -with Loggable { - @volatile - private[this] var _userId: String = _ - @volatile +class UserActor extends ReflectiveRoleableActor { + private[this] var _imState: IMStateSnapshot = _ private[this] var _userState: UserState = _ - @volatile - private[this] var _timestampTheshold: Long = _ - private[this] lazy val messenger = producer("aquarium") // FIXME: Read this from configuration + self.lifeCycle = Temporary - def role = UserActorRole - - private[this] def _configurator: Configurator = Configurator.MasterConfigurator + private[this] def _userID = this._userState.userID + private[this] def _shutmedown(): Unit = { + if(_haveUserState) { + UserActorCache.invalidate(_userID) + } - /** - * Replay the event log for all events that affect the user state. - */ - def rebuildState(from: Long, to: Long): Unit = { - val start = TimeHelpers.nowMillis - if(_userState == null) - createBlankState - - //Rebuild state from user events - val usersDB = _configurator.storeProvider.userEventStore - val userEvents = usersDB.findIMEventsByUserId(_userId) - val numUserEvents = userEvents.size - _userState = replayIMEvents(_userState, userEvents, from, to) - - //Rebuild state from resource events - val eventsDB = _configurator.storeProvider.resourceEventStore - val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from) - val numResourceEvents = resourceEvents.size - // _userState = replayResourceEvents(_userState, resourceEvents, from, to) - - //Rebuild state from wallet entries - val wallet = _configurator.storeProvider.walletEntryStore - val walletEnties = wallet.findWalletEntriesAfter(_userId, new Date(from)) - val numWalletEntries = walletEnties.size - _userState = replayWalletEntries(_userState, walletEnties, from, to) - - INFO(("Rebuilt state from %d events (%d user events, " + - "%d resource events, %d wallet entries) in %d msec").format( - numUserEvents + numResourceEvents + numWalletEntries, - numUserEvents, numResourceEvents, numWalletEntries, - TimeHelpers.nowMillis - start)) + self.stop() } - /** - * Create an empty state for a user - */ - def createBlankState = { - this._userState = DefaultUserStateComputations.createInitialUserState(this._userId, 0L, true, 0.0) - } + override protected def onThrowable(t: Throwable, message: AnyRef) = { + logChainOfCauses(t) + ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage) - /** - * Replay user events on the provided user state - */ - def replayIMEvents(initState: UserState, events: List[IMEvent], - from: Long, to: Long): UserState = { - initState + _shutmedown() } + def role = UserActorRole - /** - * Replay wallet entries on the provided user state - */ - def replayWalletEntries(initState: UserState, events: List[WalletEntry], - from: Long, to: Long): UserState = { - initState - } + private[this] def _configurator: Aquarium = Aquarium.Instance - /** - * Persist current user state - */ - private[this] def saveUserState(): Unit = { - _configurator.storeProvider.userStateStore.storeUserState(this._userState) match { - case Just(record) => record - case NoVal => ERROR("Unknown error saving state") - case Failed(e) => - ERROR("Saving state failed: %s".format(e)); - } - } + private[this] def _timestampTheshold = + _configurator.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000) - def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { - this._timestampTheshold = event.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000) - INFO("Setup my timestampTheshold = %s", this._timestampTheshold) - } - def onUserActorInitWithUserId(event: UserActorInitWithUserId): Unit = { - this._userId = event.userId - DEBUG("Actor starting, loading state") + private[this] def _haveUserState = { + this._userState ne null } - def onProcessResourceEvent(event: ProcessResourceEvent): Unit = { - val resourceEvent = event.rcEvent - if(resourceEvent.userID != this._userId) { - ERROR("Received %s but my userId = %s".format(event, this._userId)) - } else { - //ensureUserState() - // calcWalletEntries() - //processResourceEvent(resourceEvent, true) - } + private[this] def _haveIMState = { + this._imState ne null } - private[this] def processCreateUser(event: IMEvent): Unit = { - val userId = event.userID - DEBUG("Creating user from state %s", event) - val usersDB = _configurator.storeProvider.userStateStore - usersDB.findUserStateByUserId(userId) match { - case Just(userState) ⇒ - WARN("User already created, state = %s".format(userState)) - case failed@Failed(e) ⇒ - ERROR("[%s] %s", e.getClass.getName, e.getMessage) - case NoVal ⇒ - val agreement = RoleAgreements.agreementForRole(event.role) - DEBUG("User %s assigned agreement %s".format(userId, agreement.name)) - - this._userState = DefaultUserStateComputations.createInitialUserState( - userId, - event.occurredMillis, - event.isActive, 0.0, List(event.role), agreement.name) - saveUserState - DEBUG("Created and stored %s", this._userState) - } + def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { } - private[this] def processModifyUser(event: IMEvent): Unit = { - val now = TimeHelpers.nowMillis - val newActive = ActiveStateSnapshot(event.isStateActive, now) - - DEBUG("New active status = %s".format(newActive)) - - this._userState = this._userState.copy(activeStateSnapshot = newActive) + def onActorProviderConfigured(event: ActorProviderConfigured): Unit = { } - def onProcessUserEvent(event: ProcessUserEvent): Unit = { - val userEvent = event.imEvent - if(userEvent.userID != this._userId) { - ERROR("Received %s but my userId = %s".format(userEvent, this._userId)) - } else { - if(userEvent.isCreateUser) { - processCreateUser(userEvent) - } else if(userEvent.isModifyUser) { - processModifyUser(userEvent) - } - } + private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = { + // FIXME: Implement based on the role + "default" } - def onRequestUserBalance(event: RequestUserBalance): Unit = { - val userId = event.userId - val timestamp = event.timestamp + /** + * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s. + * When this method is called, we assume that all proper checks have been made and it + * is OK to proceed with the event processing. + */ + def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = { + val imEvent = processEvent.imEvent + val hadIMState = _haveIMState - if(TimeHelpers.nowMillis - _userState.newestSnapshotTime > 60 * 1000) { - // calcWalletEntries() - } - self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount) - } + if(hadIMState) { - def onUserRequestGetState(event: UserRequestGetState): Unit = { - val userId = event.userId - if(this._userId != userId) { - ERROR("Received %s but my userId = %s".format(event, this._userId)) - // TODO: throw an exception here + this._imState = this._imState.addMostRecentEvent(imEvent) } else { - // FIXME: implement - ERROR("FIXME: Should have properly computed the user state") - // ensureUserState() - self reply UserResponseGetState(userId, this._userState) + this._imState = IMStateSnapshot.initial(imEvent) } + + DEBUG("%s %s = %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState), this._imState) } - def onActorProviderConfigured(event: ActorProviderConfigured): Unit = { + def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = { + val userId = event.userID + // FIXME: Implement + self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount)) } - override def postStop { - DEBUG("Stopping, saving state") - saveUserState + def onGetUserStateRequest(event: GetUserStateRequest): Unit = { + val userId = event.userID + // FIXME: Implement + self reply GetUserStateResponse(userId, Right(this._userState)) } - override def preRestart(reason: Throwable) { - DEBUG("Actor failed, restarting") + def onProcessResourceEvent(event: ProcessResourceEvent): Unit = { + val rcEvent = event.rcEvent + + logger.info("Got\n{}", rcEvent.toJsonString) } - override def postRestart(reason: Throwable) { - DEBUG("Actor restarted succesfully") + + private[this] def D_userID = { + if(this._userState eq null) + if(this._imState eq null) + "" + else + this._imState.imEvent.userID + else + this._userState.userID } private[this] def DEBUG(fmt: String, args: Any*) = - logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args: _*))) + logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) private[this] def INFO(fmt: String, args: Any*) = - logger.info("UserActor[%s]: %s".format(_userId, fmt.format(args: _*))) + logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) private[this] def WARN(fmt: String, args: Any*) = - logger.warn("UserActor[%s]: %s".format(_userId, fmt.format(args: _*))) + logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) private[this] def ERROR(fmt: String, args: Any*) = - logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args: _*))) + logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*))) + + private[this] def ERROR(t: Throwable, fmt: String, args: Any*) = + logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t) }