import gr.grnet.aquarium.user._
import gr.grnet.aquarium.util.shortClassNameOf
-import gr.grnet.aquarium.util.date.TimeHelpers
import gr.grnet.aquarium.actor.message.service.router._
import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
import gr.grnet.aquarium.event.im.IMEventModel
import akka.config.Supervision.Temporary
import gr.grnet.aquarium.{AquariumInternalError, AquariumException, Configurator}
+import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
/**
*/
class UserActor extends ReflectiveRoleableActor {
- private[this] var _userID: String = _
+ private[this] var _imState: IMStateSnapshot = _
private[this] var _userState: UserState = _
self.lifeCycle = Temporary
+ private[this] def _userID = this._userState.userID
private[this] def _shutmedown(): Unit = {
- if(_haveFullState) {
- UserActorCache.invalidate(this._userID)
+ if(_haveUserState) {
+ UserActorCache.invalidate(_userID)
}
self.stop()
def role = UserActorRole
private[this] def _configurator: Configurator = Configurator.MasterConfigurator
-// private[this] def _userId = _userState.userId
private[this] def _timestampTheshold =
_configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
- private[this] def _haveFullState = {
- (this._userID ne null) && (this._userState ne null)
+ private[this] def _haveUserState = {
+ this._userState ne null
}
- private[this] def _havePartialState = {
- (this._userID ne null) && (this._userState eq null)
+ private[this] def _haveIMState = {
+ this._imState ne null
}
-
def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
}
"default"
}
- private[this] def processCreateUser(imEvent: IMEventModel): Unit = {
- this._userID = imEvent.userID
-
- val store = _configurator.storeProvider.userStateStore
- // try find user state. normally should ot exist
- val latestUserStateOpt = store.findLatestUserStateByUserID(this._userID)
- if(latestUserStateOpt.isDefined) {
- logger.error("Got %s(%s, %s) but user already exists. Ingoring".format(
- this._userID,
- shortClassNameOf(imEvent),
- imEvent.eventType))
-
- return
- }
-
- val initialAgreementName = _getAgreementNameForNewUser(imEvent)
- val newUserState = DefaultUserStateComputations.createInitialUserState(
- this._userID,
- imEvent.occurredMillis,
- imEvent.isActive,
- 0.0,
- List(imEvent.role),
- initialAgreementName)
-
- this._userState = newUserState
-
- // FIXME: If this fails, then the actor must be shut down.
- store.insertUserState(newUserState)
- }
-
- private[this] def processModifyUser(imEvent: IMEventModel): Unit = {
- val now = TimeHelpers.nowMillis()
-
- if(!_haveFullState) {
- ERROR("Got %s(%s) but have no state. Shutting down", shortClassNameOf(imEvent), imEvent.eventType)
- _shutmedown()
- return
- }
-
- this._userState = this._userState.modifyFromIMEvent(imEvent, now)
- }
-
- def onProcessSetUserID(event: ProcessSetUserID): Unit = {
- this._userID = event.userID
- }
-
def onProcessIMEvent(event: ProcessIMEvent): Unit = {
val now = TimeHelpers.nowMillis()
val imEvent = event.imEvent
- // If we already have a userID but it does not match the incoming userID, then this is an internal error
- if(_havePartialState && (this._userID != imEvent.userID)) {
- throw new AquariumInternalError(
- "Got userID = %s but already have userID = %s".format(imEvent.userID, this._userID))
- }
-
- // If we get an IMEvent without having a user state, then we query for the latest user state.
- if(!_haveFullState) {
- val userStateOpt = _configurator.userStateStore.findLatestUserStateByUserID(this._userID)
- this._userState = userStateOpt match {
- case Some(userState) ⇒
- userState
-
- case None ⇒
- val initialAgreementName = _getAgreementNameForNewUser(imEvent)
- val initialUserState = DefaultUserStateComputations.createInitialUserState(
- this._userID,
- imEvent.occurredMillis,
- imEvent.isActive,
- 0.0,
- List(imEvent.role),
- initialAgreementName)
-
- DEBUG("Got initial state")
- initialUserState
- }
- }
+ val isUpdate = if(_haveIMState) {
+ val newOccurredMillis = imEvent.occurredMillis
+ val currentOccurredMillis = this._imState.imEvent.occurredMillis
- if(imEvent.isModifyUser && this._userState.isInitial) {
- INFO("Got a '%s' but have not received '%s' yet", imEvent.eventType, IMEventModel.EventTypeNames.create)
- return
- }
-
- if(imEvent.isCreateUser && !this._userState.isInitial) {
- INFO("Got a '%s' but my state is not initial", imEvent.eventType)
- return
- }
+ if(newOccurredMillis < currentOccurredMillis) {
+ INFO(
+ "Ignoring older IMEvent: [%s] < [%s]",
+ new MutableDateCalc(newOccurredMillis).toYYYYMMDDHHMMSSSSS,
+ new MutableDateCalc(currentOccurredMillis).toYYYYMMDDHHMMSSSSS)
- this._userState = this._userState.modifyFromIMEvent(imEvent, now)
+ return
+ }
- if(imEvent.isCreateUser) {
- processCreateUser(imEvent)
- } else if(imEvent.isModifyUser) {
- processModifyUser(imEvent)
+ true
} else {
- throw new AquariumException("Cannot interpret %s".format(imEvent))
+ false
}
+
+ this._imState = IMStateSnapshot(imEvent, now)
+ DEBUG("%s %s", if(isUpdate) "Update" else "Set", shortClassNameOf(this._imState))
}
def onRequestUserBalance(event: RequestUserBalance): Unit = {
private[this] def D_userID = {
- if(this._userID eq null)
- "<NOT INITIALIZED>" // We always get a userID first
- else
- if(this._userState eq null)
- "%s, NO STATE".format(this._userID)
+ if(this._userState eq null)
+ if(this._imState eq null)
+ "<NOT INITIALIZED>"
else
- "%s".format(this._userID)
+ this._imState.imEvent.userID
+ else
+ this._userState.userID
}
private[this] def DEBUG(fmt: String, args: Any*) =
logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
- logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
+ logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
}