WIP: IMEventModel end-to-end chain
authorChristos KK Loverdos <loverdos@gmail.com>
Thu, 3 May 2012 10:22:56 +0000 (13:22 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Thu, 3 May 2012 10:22:56 +0000 (13:22 +0300)
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/user/UserState.scala
src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala
src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala

index 865aa02..0c2ffee 100644 (file)
@@ -70,7 +70,7 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   override protected def onThrowable(t: Throwable, message: AnyRef) = {
-    ERROR("Oops!\n", chainOfCauses(t).map("!! " + _) mkString "\n")
+    logChainOfCauses(t)
     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
 
     _shutmedown()
@@ -89,36 +89,40 @@ class UserActor extends ReflectiveRoleableActor {
     (this._userID ne null) && (this._userState ne null)
   }
 
+  private[this] def _havePartialState = {
+    (this._userID ne null) && (this._userState eq null)
+  }
+
+
   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
   }
 
   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
   }
 
-  private[this] def _computeAgreementForNewUser(imEvent: IMEventModel): String = {
+  private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = {
     // FIXME: Implement based on the role
     "default"
   }
 
   private[this] def processCreateUser(imEvent: IMEventModel): Unit = {
-    val userID = imEvent.userID
-    this._userID = userID
+    this._userID = imEvent.userID
 
     val store = _configurator.storeProvider.userStateStore
     // try find user state. normally should ot exist
-    val latestUserStateOpt = store.findLatestUserStateByUserID(userID)
+    val latestUserStateOpt = store.findLatestUserStateByUserID(this._userID)
     if(latestUserStateOpt.isDefined) {
       logger.error("Got %s(%s, %s) but user already exists. Ingoring".format(
-        userID,
+        this._userID,
         shortClassNameOf(imEvent),
         imEvent.eventType))
 
       return
     }
 
-    val initialAgreementName = _computeAgreementForNewUser(imEvent)
+    val initialAgreementName = _getAgreementNameForNewUser(imEvent)
     val newUserState    = DefaultUserStateComputations.createInitialUserState(
-      userID,
+      this._userID,
       imEvent.occurredMillis,
       imEvent.isActive,
       0.0,
@@ -148,7 +152,48 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   def onProcessIMEvent(event: ProcessIMEvent): Unit = {
+    val now = TimeHelpers.nowMillis()
+
     val imEvent = event.imEvent
+    // If we already have a userID but it does not match the incoming userID, then this is an internal error
+    if(_havePartialState && (this._userID != imEvent.userID)) {
+      throw new AquariumException("Got userID = %s but already have userID = %s".format(imEvent.userID, this._userID))
+    }
+
+    // If we get an IMEvent without having a user state, then we query for the latest user state.
+    if(!_haveFullState) {
+      val userStateOpt = _configurator.userStateStore.findLatestUserStateByUserID(this._userID)
+      this._userState = userStateOpt match {
+        case Some(userState) ⇒
+          userState
+
+        case None ⇒
+          val initialAgreementName = _getAgreementNameForNewUser(imEvent)
+          val initialUserState = DefaultUserStateComputations.createInitialUserState(
+            this._userID,
+            imEvent.occurredMillis,
+            imEvent.isActive,
+            0.0,
+            List(imEvent.role),
+            initialAgreementName)
+
+          DEBUG("Got initial state")
+          initialUserState
+      }
+    }
+
+    if(imEvent.isModifyUser && this._userState.isInitial) {
+      INFO("Got a '%s' but have not received '%s' yet", imEvent.eventType, IMEventModel.EventTypeNames.create)
+      return
+    }
+
+    if(imEvent.isCreateUser && !this._userState.isInitial) {
+      INFO("Got a '%s' but my state is not initial", imEvent.eventType)
+      return
+    }
+
+    this._userState = this._userState.modifyFromIMEvent(imEvent, now)
+
     if(imEvent.isCreateUser) {
       processCreateUser(imEvent)
     } else if(imEvent.isModifyUser) {
index e401014..c645798 100644 (file)
@@ -83,9 +83,7 @@ import org.bson.types.ObjectId
  * @param newWalletEntries
  *          The wallet entries computed. Not all user states need to holds wallet entries,
  *          only those that refer to billing periods (end of billing period).
- * @param lastChangeReasonCode
- *          The code for the `lastChangeReason`.
- * @param lastChangeReason
+  * @param lastChangeReason
  *          The [[gr.grnet.aquarium.user.UserStateChangeReason]] for which the usr state has changed.
  * @param totalEventsProcessedCounter
  * @param parentUserStateId
@@ -97,6 +95,7 @@ import org.bson.types.ObjectId
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 case class UserState(
+    isInitial: Boolean,
     userID: String,
 
     userCreationMillis: Long,
@@ -165,7 +164,6 @@ case class UserState(
     rolesSnapshot: RolesSnapshot,
     ownedResourcesSnapshot: OwnedResourcesSnapshot,
     newWalletEntries: List[NewWalletEntry],
-    lastChangeReasonCode: UserStateChangeReasonCodes.ChangeReasonCode,
     // The last known change reason for this userState
     lastChangeReason: UserStateChangeReason = NoSpecificChangeReason,
     totalEventsProcessedCounter: Long = 0L,
@@ -228,15 +226,18 @@ case class UserState(
   }
   
   def copyForChangeReason(changeReason: UserStateChangeReason) = {
-    this.copy(lastChangeReasonCode = changeReason.code, lastChangeReason = changeReason)
+    this.copy(lastChangeReason = changeReason)
   }
 
   def resourcesMap = ownedResourcesSnapshot.toResourcesMap
 
   def modifyFromIMEvent(imEvent: IMEventModel, snapshotMillis: Long): UserState = {
+    val changeReason = IMEventArrival(imEvent)
     this.copy(
+      isInitial = false,
       activeStateSnapshot = ActiveStateSnapshot(imEvent.isActive, snapshotMillis),
-      rolesSnapshot = RolesSnapshot(List(imEvent.role), snapshotMillis)
+      rolesSnapshot = RolesSnapshot(List(imEvent.role), snapshotMillis),
+      lastChangeReason = changeReason
     )
   }
 
@@ -329,14 +330,14 @@ sealed trait UserStateChangeReason {
 object UserStateChangeReasonCodes {
   type ChangeReasonCode = Int
 
-  final val InitialCalculationCode = 1
+  final val InitialSetupCode       = 1
   final val NoSpecificChangeCode   = 2
   final val MonthlyBillingCode     = 3
   final val RealtimeBillingCode    = 4
-  final val IMEventArrivalCode   = 5
+  final val IMEventArrivalCode     = 5
 }
 
-case object InitialUserStateCalculation extends UserStateChangeReason {
+case object InitialUserStateSetup extends UserStateChangeReason {
   def shouldStoreUserState = true
 
   def shouldStoreCalculatedWalletEntries = false
@@ -345,7 +346,7 @@ case object InitialUserStateCalculation extends UserStateChangeReason {
 
   def calculateCreditsForImplicitlyTerminated = false
 
-  def code = UserStateChangeReasonCodes.InitialCalculationCode
+  def code = UserStateChangeReasonCodes.InitialSetupCode
 }
 /**
  * A calculation made for no specific reason. Can be for testing, for example.
index 52a6f3a..e00a37e 100644 (file)
@@ -62,6 +62,7 @@ class UserStateComputations extends Loggable {
     val now = userCreationMillis
 
     UserState(
+      true,
       userId,
       userCreationMillis,
       0L,
@@ -79,8 +80,7 @@ class UserStateComputations extends Loggable {
       RolesSnapshot(roleNames, now),
       OwnedResourcesSnapshot(Nil, now),
       Nil,
-      UserStateChangeReasonCodes.InitialCalculationCode,
-      InitialUserStateCalculation
+      InitialUserStateSetup
     )
   }
 
@@ -136,7 +136,7 @@ class UserStateComputations extends Loggable {
       // If the user did not exist for this billing month, piece of cake
       clog.debug("User did not exist before %s", userCreationDateCalc)
 
-      // NOTE: Reason here will be: InitialUserStateCalculation
+      // NOTE: Reason here will be: InitialUserStateSetup$
       val initialUserState0 = createInitialUserStateFrom(currentUserState)
       val initialUserStateM = userStateStore.insertUserState2(initialUserState0)
 
index b81cc6d..d318056 100644 (file)
@@ -262,7 +262,6 @@ aquariumpolicy:
     val parentId = userState.parentUserStateId
     val credits = userState.creditsSnapshot.creditAmount
     val newWalletEntries = userState.newWalletEntries.map(_.toDebugString)
-    val changeReasonCode = userState.lastChangeReasonCode
     val changeReason = userState.lastChangeReason
     val implicitlyIssued = userState.implicitlyIssuedSnapshot.implicitlyIssuedEvents.map(_.toDebugString())
     val latestResourceEvents = userState.latestResourceEventsSnapshot.resourceEvents.map(_.toDebugString())
@@ -270,7 +269,6 @@ aquariumpolicy:
     clog.debug("_id = %s", id)
     clog.debug("parentId = %s", parentId)
     clog.debug("credits = %s", credits)
-    clog.debug("changeReasonCode = %s", changeReasonCode)
     clog.debug("changeReason = %s", changeReason)
     clog.debugSeq("implicitlyIssued", implicitlyIssued, 0)
     clog.debugSeq("latestResourceEvents", latestResourceEvents, 0)