Reorg initialization seq
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 1 Oct 2012 11:40:45 +0000 (14:40 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 1 Oct 2012 11:40:45 +0000 (14:40 +0300)
src/main/avro/aquarium-user-state.avdl
src/main/scala/gr/grnet/aquarium/Aquarium.scala
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/charging/state/UserAgreementHistoryModel.scala
src/main/scala/gr/grnet/aquarium/charging/state/UserStateModel.scala
src/main/scala/gr/grnet/aquarium/connector/handler/IMEventPayloadHandler.scala
src/main/scala/gr/grnet/aquarium/message/avro/MessageFactory.scala
src/main/scala/gr/grnet/aquarium/message/avro/MessageHelpers.scala
src/main/scala/gr/grnet/aquarium/store/memory/MemStoreProvider.scala

index 4b5c366..c961e9d 100644 (file)
@@ -83,9 +83,9 @@ protocol AquariumUserState {
   record ChargeslotMsg {
     long startMillis;
     long stopMillis;
-   string unitPrice;
+    string unitPrice;
     string explanation = "";
-   string creditsToSubtract = 0.0;
+    string creditsToSubtract = 0.0;
   }
 
   // The following equation must hold (within a very small error):
index c372511..af255de 100644 (file)
@@ -411,7 +411,7 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
    * @param imEvent       The IMEvent that creates the user
    */
   def initialUserAgreement(imEvent: IMEventMsg): UserAgreementModel = {
-    require(MessageHelpers.isIMEventCreate(imEvent))
+    require(MessageHelpers.isUserCreationIMEvent(imEvent))
 
     val role = imEvent.getRole
     val referenceTimeMillis = imEvent.getOccurredMillis
index 59b52f2..12006a1 100644 (file)
@@ -136,7 +136,7 @@ class UserActor extends ReflectiveRoleableActor {
       _imcounter += 1
       DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent)
 
-      if(_imcounter == 1 && !MessageHelpers.isIMEventCreate(imEvent)) {
+      if(_imcounter == 1 && !MessageHelpers.isUserCreationIMEvent(imEvent)) {
         // The very first event must be a CREATE event. Otherwise we abort initialization.
         // This will normally happen during devops :)
         INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
@@ -176,20 +176,28 @@ class UserActor extends ReflectiveRoleableActor {
     )
   }
 
-  private[this] def loadLastKnownUserStateAndUpdateAgreements(historyMsg: UserAgreementHistoryMsg) {
-    val userID = historyMsg.getUserID
-    val latestUserStateOpt = aquarium.userStateStore.findLatestUserState(userID)
-    latestUserStateOpt match {
+  private[this] def loadLastKnownUserStateAndUpdateAgreements() {
+    val userID = this._userStateModel.userID
+    aquarium.userStateStore.findLatestUserState(userID) match {
       case None ⇒
         // First user state ever
         saveFirstUserState(userID)
 
       case Some(latestUserState) ⇒
         this._userStateModel.updateUserStateMsg(latestUserState)
-        this._userStateModel.updateUserAgreementHistoryMsg(historyMsg)
     }
   }
 
+  private[this] def processResourceEventsAfterLastKnownUserState() {
+    // Update the user state snapshot with fresh (ie not previously processed) events.
+
+  }
+
+  private[this] def makeUserStateMsgUpToDate() {
+    loadLastKnownUserStateAndUpdateAgreements()
+    processResourceEventsAfterLastKnownUserState()
+  }
+
   private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = {
     if(!isInitial) {
       return false
@@ -198,7 +206,7 @@ class UserActor extends ReflectiveRoleableActor {
     val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID)
 
     if(userCreated) {
-      loadLastKnownUserStateAndUpdateAgreements(this._userStateModel.userAgreementHistoryMsg)
+      makeUserStateMsgUpToDate()
     }
 
     nextThing()
@@ -236,8 +244,8 @@ class UserActor extends ReflectiveRoleableActor {
     this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
     this._imMsgCount += 1
 
-    if(haveUserCreationEvent) {
-      loadLastKnownUserStateAndUpdateAgreements(this._userStateModel.userAgreementHistoryMsg)
+    if(MessageHelpers.isUserCreationIMEvent(imEvent)) {
+      makeUserStateMsgUpToDate()
     }
 
     DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
index f56b0e1..86bf7b4 100644 (file)
@@ -68,7 +68,7 @@ final class UserAgreementHistoryModel(val userAgreementHistoryMsg: UserAgreement
   }
 
   private[this] def checkUserCreationIMEvent(imEvent: IMEventMsg) {
-    if(MessageHelpers.isIMEventCreate(imEvent)) {
+    if(MessageHelpers.isUserCreationIMEvent(imEvent)) {
       this._userCreationIMEventMsgOpt = Some(imEvent)
     }
   }
index df5fda0..1f7b0f5 100644 (file)
@@ -78,7 +78,7 @@ final class UserStateModel(
   }
 
   private[this] def checkUserCreationIMEvent(imEvent: IMEventMsg) {
-    if(MessageHelpers.isIMEventCreate(imEvent)) {
+    if(MessageHelpers.isUserCreationIMEvent(imEvent)) {
       this._userCreationIMEventMsgOpt = Some(imEvent)
     }
   }
index c08f06e..882e307 100644 (file)
@@ -137,7 +137,7 @@ class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger)
             val userID = imEvent.getUserID
             val createIMEventOpt = store.findCreateIMEventByUserID(userID)
             val userHasBeenCreated = createIMEventOpt.isDefined
-            val isCreateUser = MessageHelpers.isIMEventCreate(imEvent)
+            val isCreateUser = MessageHelpers.isUserCreationIMEvent(imEvent)
 
             (userHasBeenCreated, isCreateUser) match {
               case (true, true) ⇒
index d84e25d..7e5fa92 100644 (file)
@@ -392,6 +392,7 @@ object MessageFactory {
     msg.setStateOfResources(newJMap)
     msg.setWalletEntries(newJList)
     msg.setUserAgreementHistory(newUserAgreementHistoryMsg(userID))
+    msg.setStateOfResources(newJMap)
     msg
   }
 }
index 67f2431..9b84594 100644 (file)
@@ -96,11 +96,11 @@ final object MessageHelpers {
     !isOccurredWithinMillis(event, billingStartMillis, billingStopMillis)
   }
 
-  @inline final def isIMEventCreate(msg: IMEventMsg) = {
+  @inline final def isUserCreationIMEvent(msg: IMEventMsg) = {
     msg.getEventType().toLowerCase() == MessageConstants.IMEventMsg.EventTypes.create
   }
 
-  @inline final def isIMEventModify(msg: IMEventMsg) = {
+  @inline final def isUserModificationIMEvent(msg: IMEventMsg) = {
     msg.getEventType().toLowerCase() == MessageConstants.IMEventMsg.EventTypes.modify
   }
 
@@ -325,7 +325,7 @@ final object MessageHelpers {
     agreement.getRelatedIMEventMsg match {
       case null ⇒
       case imEvent ⇒
-        if(isIMEventCreate(imEvent)) {
+        if(isUserCreationIMEvent(imEvent)) {
           history.setUserCreationTimeMillis(imEvent.getOccurredMillis)
         }
     }
index f919fa8..9f0a66a 100644 (file)
@@ -180,7 +180,7 @@ extends StoreProvider
    */
   def findCreateIMEventByUserID(userID: String) = {
     _imEvents.find { event ⇒
-      event.getUserID() == userID && MessageHelpers.isIMEventCreate(event)
+      event.getUserID() == userID && MessageHelpers.isUserCreationIMEvent(event)
     }
   }