From: Christos KK Loverdos Date: Thu, 3 May 2012 10:22:56 +0000 (+0300) Subject: WIP: IMEventModel end-to-end chain X-Git-Url: https://code.grnet.gr/git/aquarium/commitdiff_plain/ed6cccb26cfd66eb68f2cee1b22137e503081475?ds=sidebyside WIP: IMEventModel end-to-end chain --- diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index 865aa02..0c2ffee 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -70,7 +70,7 @@ class UserActor extends ReflectiveRoleableActor { } override protected def onThrowable(t: Throwable, message: AnyRef) = { - ERROR("Oops!\n", chainOfCauses(t).map("!! " + _) mkString "\n") + logChainOfCauses(t) ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage) _shutmedown() @@ -89,36 +89,40 @@ class UserActor extends ReflectiveRoleableActor { (this._userID ne null) && (this._userState ne null) } + private[this] def _havePartialState = { + (this._userID ne null) && (this._userState eq null) + } + + def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { } def onActorProviderConfigured(event: ActorProviderConfigured): Unit = { } - private[this] def _computeAgreementForNewUser(imEvent: IMEventModel): String = { + private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = { // FIXME: Implement based on the role "default" } private[this] def processCreateUser(imEvent: IMEventModel): Unit = { - val userID = imEvent.userID - this._userID = userID + this._userID = imEvent.userID val store = _configurator.storeProvider.userStateStore // try find user state. normally should ot exist - val latestUserStateOpt = store.findLatestUserStateByUserID(userID) + val latestUserStateOpt = store.findLatestUserStateByUserID(this._userID) if(latestUserStateOpt.isDefined) { logger.error("Got %s(%s, %s) but user already exists. Ingoring".format( - userID, + this._userID, shortClassNameOf(imEvent), imEvent.eventType)) return } - val initialAgreementName = _computeAgreementForNewUser(imEvent) + val initialAgreementName = _getAgreementNameForNewUser(imEvent) val newUserState = DefaultUserStateComputations.createInitialUserState( - userID, + this._userID, imEvent.occurredMillis, imEvent.isActive, 0.0, @@ -148,7 +152,48 @@ class UserActor extends ReflectiveRoleableActor { } def onProcessIMEvent(event: ProcessIMEvent): Unit = { + val now = TimeHelpers.nowMillis() + val imEvent = event.imEvent + // If we already have a userID but it does not match the incoming userID, then this is an internal error + if(_havePartialState && (this._userID != imEvent.userID)) { + throw new AquariumException("Got userID = %s but already have userID = %s".format(imEvent.userID, this._userID)) + } + + // If we get an IMEvent without having a user state, then we query for the latest user state. + if(!_haveFullState) { + val userStateOpt = _configurator.userStateStore.findLatestUserStateByUserID(this._userID) + this._userState = userStateOpt match { + case Some(userState) ⇒ + userState + + case None ⇒ + val initialAgreementName = _getAgreementNameForNewUser(imEvent) + val initialUserState = DefaultUserStateComputations.createInitialUserState( + this._userID, + imEvent.occurredMillis, + imEvent.isActive, + 0.0, + List(imEvent.role), + initialAgreementName) + + DEBUG("Got initial state") + initialUserState + } + } + + if(imEvent.isModifyUser && this._userState.isInitial) { + INFO("Got a '%s' but have not received '%s' yet", imEvent.eventType, IMEventModel.EventTypeNames.create) + return + } + + if(imEvent.isCreateUser && !this._userState.isInitial) { + INFO("Got a '%s' but my state is not initial", imEvent.eventType) + return + } + + this._userState = this._userState.modifyFromIMEvent(imEvent, now) + if(imEvent.isCreateUser) { processCreateUser(imEvent) } else if(imEvent.isModifyUser) { diff --git a/src/main/scala/gr/grnet/aquarium/user/UserState.scala b/src/main/scala/gr/grnet/aquarium/user/UserState.scala index e401014..c645798 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserState.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserState.scala @@ -83,9 +83,7 @@ import org.bson.types.ObjectId * @param newWalletEntries * The wallet entries computed. Not all user states need to holds wallet entries, * only those that refer to billing periods (end of billing period). - * @param lastChangeReasonCode - * The code for the `lastChangeReason`. - * @param lastChangeReason + * @param lastChangeReason * The [[gr.grnet.aquarium.user.UserStateChangeReason]] for which the usr state has changed. * @param totalEventsProcessedCounter * @param parentUserStateId @@ -97,6 +95,7 @@ import org.bson.types.ObjectId * @author Christos KK Loverdos */ case class UserState( + isInitial: Boolean, userID: String, userCreationMillis: Long, @@ -165,7 +164,6 @@ case class UserState( rolesSnapshot: RolesSnapshot, ownedResourcesSnapshot: OwnedResourcesSnapshot, newWalletEntries: List[NewWalletEntry], - lastChangeReasonCode: UserStateChangeReasonCodes.ChangeReasonCode, // The last known change reason for this userState lastChangeReason: UserStateChangeReason = NoSpecificChangeReason, totalEventsProcessedCounter: Long = 0L, @@ -228,15 +226,18 @@ case class UserState( } def copyForChangeReason(changeReason: UserStateChangeReason) = { - this.copy(lastChangeReasonCode = changeReason.code, lastChangeReason = changeReason) + this.copy(lastChangeReason = changeReason) } def resourcesMap = ownedResourcesSnapshot.toResourcesMap def modifyFromIMEvent(imEvent: IMEventModel, snapshotMillis: Long): UserState = { + val changeReason = IMEventArrival(imEvent) this.copy( + isInitial = false, activeStateSnapshot = ActiveStateSnapshot(imEvent.isActive, snapshotMillis), - rolesSnapshot = RolesSnapshot(List(imEvent.role), snapshotMillis) + rolesSnapshot = RolesSnapshot(List(imEvent.role), snapshotMillis), + lastChangeReason = changeReason ) } @@ -329,14 +330,14 @@ sealed trait UserStateChangeReason { object UserStateChangeReasonCodes { type ChangeReasonCode = Int - final val InitialCalculationCode = 1 + final val InitialSetupCode = 1 final val NoSpecificChangeCode = 2 final val MonthlyBillingCode = 3 final val RealtimeBillingCode = 4 - final val IMEventArrivalCode = 5 + final val IMEventArrivalCode = 5 } -case object InitialUserStateCalculation extends UserStateChangeReason { +case object InitialUserStateSetup extends UserStateChangeReason { def shouldStoreUserState = true def shouldStoreCalculatedWalletEntries = false @@ -345,7 +346,7 @@ case object InitialUserStateCalculation extends UserStateChangeReason { def calculateCreditsForImplicitlyTerminated = false - def code = UserStateChangeReasonCodes.InitialCalculationCode + def code = UserStateChangeReasonCodes.InitialSetupCode } /** * A calculation made for no specific reason. Can be for testing, for example. diff --git a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala index 52a6f3a..e00a37e 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala @@ -62,6 +62,7 @@ class UserStateComputations extends Loggable { val now = userCreationMillis UserState( + true, userId, userCreationMillis, 0L, @@ -79,8 +80,7 @@ class UserStateComputations extends Loggable { RolesSnapshot(roleNames, now), OwnedResourcesSnapshot(Nil, now), Nil, - UserStateChangeReasonCodes.InitialCalculationCode, - InitialUserStateCalculation + InitialUserStateSetup ) } @@ -136,7 +136,7 @@ class UserStateComputations extends Loggable { // If the user did not exist for this billing month, piece of cake clog.debug("User did not exist before %s", userCreationDateCalc) - // NOTE: Reason here will be: InitialUserStateCalculation + // NOTE: Reason here will be: InitialUserStateSetup$ val initialUserState0 = createInitialUserStateFrom(currentUserState) val initialUserStateM = userStateStore.insertUserState2(initialUserState0) diff --git a/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala b/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala index b81cc6d..d318056 100644 --- a/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala +++ b/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala @@ -262,7 +262,6 @@ aquariumpolicy: val parentId = userState.parentUserStateId val credits = userState.creditsSnapshot.creditAmount val newWalletEntries = userState.newWalletEntries.map(_.toDebugString) - val changeReasonCode = userState.lastChangeReasonCode val changeReason = userState.lastChangeReason val implicitlyIssued = userState.implicitlyIssuedSnapshot.implicitlyIssuedEvents.map(_.toDebugString()) val latestResourceEvents = userState.latestResourceEventsSnapshot.resourceEvents.map(_.toDebugString()) @@ -270,7 +269,6 @@ aquariumpolicy: clog.debug("_id = %s", id) clog.debug("parentId = %s", parentId) clog.debug("credits = %s", credits) - clog.debug("changeReasonCode = %s", changeReasonCode) clog.debug("changeReason = %s", changeReason) clog.debugSeq("implicitlyIssued", implicitlyIssued, 0) clog.debugSeq("latestResourceEvents", latestResourceEvents, 0)