WIP: New state machine for message processing
authorChristos KK Loverdos <loverdos@gmail.com>
Wed, 19 Sep 2012 16:06:58 +0000 (19:06 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Wed, 19 Sep 2012 16:06:58 +0000 (19:06 +0300)
src/main/scala/gr/grnet/aquarium/actor/message/config/SetUserActorUserID.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/service/AkkaService.scala
src/main/scala/gr/grnet/aquarium/store/IMEventStore.scala
src/main/scala/gr/grnet/aquarium/store/memory/MemStoreProvider.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala

diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/config/SetUserActorUserID.scala b/src/main/scala/gr/grnet/aquarium/actor/message/config/SetUserActorUserID.scala
new file mode 100644 (file)
index 0000000..3d90040
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ *
+ *  * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *  *
+ *  * Redistribution and use in source and binary forms, with or
+ *  * without modification, are permitted provided that the following
+ *  * conditions are met:
+ *  *
+ *  *   1. Redistributions of source code must retain the above
+ *  *      copyright notice, this list of conditions and the following
+ *  *      disclaimer.
+ *  *
+ *  *   2. Redistributions in binary form must reproduce the above
+ *  *      copyright notice, this list of conditions and the following
+ *  *      disclaimer in the documentation and/or other materials
+ *  *      provided with the distribution.
+ *  *
+ *  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ *  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ *  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ *  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ *  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ *  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ *  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ *  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ *  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ *  * POSSIBILITY OF SUCH DAMAGE.
+ *  *
+ *  * The views and conclusions contained in the software and
+ *  * documentation are those of the authors and should not be
+ *  * interpreted as representing official policies, either expressed
+ *  * or implied, of GRNET S.A.
+ *
+ */
+
+package gr.grnet.aquarium.actor.message.config
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class SetUserActorUserID(userID: String)
index 7d339a4..bfc95a1 100644 (file)
@@ -66,7 +66,6 @@ import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
  */
 
 class UserActor extends ReflectiveRoleableActor {
-  private[this] var _isFirstMessage = true
   private[this] var _rcMsgCount = 0
   private[this] var _imMsgCount = 0
   private[this] var _userID: String = "<?>"
@@ -74,6 +73,7 @@ class UserActor extends ReflectiveRoleableActor {
   private[this] var _userCreationIMEvent: IMEventMsg = _
   private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _
   private[this] var _latestIMEventOriginalID: String = ""
+  private[this] var _latestIMEventOccurredMillis: Long = -1L
   private[this] var _latestResourceEventOriginalID: String = ""
   private[this] var _userStateBootstrap: UserStateBootstrap = _
 
@@ -115,10 +115,6 @@ class UserActor extends ReflectiveRoleableActor {
     this._userID ne null
   }
 
-  @inline private[this] def haveUserCreationIMEvent = {
-    this._userCreationIMEvent ne null
-  }
-
   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
   }
 
@@ -134,18 +130,31 @@ class UserActor extends ReflectiveRoleableActor {
     this._userStateBootstrap ne null
   }
 
+  private[this] def createUserAgreementHistoryModel(imEvent: IMEventMsg) {
+    assert(MessageHelpers.isIMEventCreate(imEvent))
+    assert(this._userAgreementHistoryModel eq null)
+    assert(this._userCreationIMEvent eq null)
+
+    this._userCreationIMEvent = imEvent
+    this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModelFromIMEvent(
+      imEvent,
+      imEvent.getOriginalID
+    )
+  }
+
   private[this] def updateAgreementHistoryFrom(imEvent: IMEventMsg): Unit = {
     val isCreateUser = MessageHelpers.isIMEventCreate(imEvent)
     if(isCreateUser) {
-      if(haveUserCreationIMEvent) {
+      if(haveAgreements) {
         throw new AquariumInternalError(
           "Got user creation event (id=%s) but I already have one (id=%s)",
-            this._userCreationIMEvent.getOriginalID,
-            imEvent.getOriginalID
+          this._userCreationIMEvent.getOriginalID,
+          imEvent.getOriginalID
         )
       }
 
-      this._userCreationIMEvent = imEvent
+      createUserAgreementHistoryModel(imEvent) // now we have an agreement history
+      createUserStateBootstrap(imEvent)
     }
 
     val effectiveFromMillis = imEvent.getOccurredMillis
@@ -154,16 +163,22 @@ class UserActor extends ReflectiveRoleableActor {
     assert(null ne aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis))
 
     // add to model (will update the underlying messages as well)
-    if(this._userAgreementHistoryModel eq null) {
-      this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModelFromIMEvent(imEvent, imEvent.getOriginalID)
-    } else {
-      val newUserAgreementModel = ModelFactory.newUserAgreementModelFromIMEvent(imEvent, imEvent.getOriginalID)
-      this._userAgreementHistoryModel += newUserAgreementModel
-    }
+    val newUserAgreementModel = ModelFactory.newUserAgreementModelFromIMEvent(imEvent, imEvent.getOriginalID)
+    this._userAgreementHistoryModel += newUserAgreementModel
+
+    // We assume that we always call this method with in-sync events
+    assert(imEvent.getOccurredMillis >= this._latestIMEventOccurredMillis)
+    updateLatestIMEventStateFrom(imEvent)
   }
 
-  private[this] def updateLatestIMEventIDFrom(imEvent: IMEventMsg): Unit = {
+//  private[this] def updateLatestIMEventIDFrom(imEvent: IMEventMsg): Unit = {
+//    this._latestIMEventOriginalID = imEvent.getOriginalID
+//  }
+
+  private[this] def updateLatestIMEventStateFrom(imEvent: IMEventMsg) {
     this._latestIMEventOriginalID = imEvent.getOriginalID
+    this._latestIMEventOccurredMillis = imEvent.getOccurredMillis
+    this._imMsgCount += 1
   }
 
   private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventMsg): Unit = {
@@ -172,28 +187,34 @@ class UserActor extends ReflectiveRoleableActor {
 
   /**
    * Creates the initial state that is related to IMEvents.
+   *
+   * @return `true` if there was a user CREATE event
    */
-  private[this] def initializeStateOfIMEvents(): Unit = {
+  private[this] def initializeStateOfIMEvents(): Boolean = {
+    DEBUG("initializeStateOfIMEvents()")
+
     // NOTE: this._userID is already set up our caller
-    aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
-      DEBUG("Replaying %s", imEvent)
+    var _imcounter = 0
 
-      updateAgreementHistoryFrom(imEvent)
-      updateLatestIMEventIDFrom(imEvent)
+    aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
+      _imcounter += 1
+      DEBUG("Replaying [%s] %s", _imcounter, imEvent)
+
+      if(_imcounter == 1 && !MessageHelpers.isIMEventCreate(imEvent)) {
+        // The very first event must be a CREATE event. Otherwise we abort initialization.
+        // This will normally happen during devops :)
+        INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
+        false
+      }
+      else {
+        updateAgreementHistoryFrom(imEvent)
+        true
+      }
     }
   }
 
-  /**
-   * Resource events are processed only if the user has been created and has agreements.
-   * Otherwise nothing can be computed.
-   */
-  private[this] def shouldProcessResourceEvents: Boolean = {
-    haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
-  }
-
   private[this] def loadUserStateAndUpdateAgreementHistory(): Unit = {
     assert(this.haveAgreements, "this.haveAgreements")
-    assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
 
     if(!haveUserStateBootstrap) {
       this._userStateBootstrap = aquarium.getUserStateBootstrap(this._userCreationIMEvent)
@@ -218,15 +239,8 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   private[this] def initializeStateOfResourceEvents(): Unit = {
-    if(!this.haveAgreements) {
-      DEBUG("No agreements to initialize resources state")
-      return
-    }
-
-    if(!this.haveUserCreationIMEvent) {
-      DEBUG("No CREATE IMEvent to initialize resources state")
-      return
-    }
+    DEBUG("initializeStateOfResourceEvents()")
+    assert(haveAgreements)
 
     // We will also need this functionality when receiving IMEvents, so we place it in a method
     loadUserStateAndUpdateAgreementHistory()
@@ -240,62 +254,64 @@ class UserActor extends ReflectiveRoleableActor {
   /**
    * Initializes the actor state from DB.
    */
-  def initializeUserActorState(userID: String): Unit = {
+  def initializeUserActorState(userID: String): Boolean = {
     this._userID = userID
 
-    DEBUG("Initializing user actor state")
+    if(initializeStateOfIMEvents()) {
+      initializeStateOfResourceEvents()
+      // Even if we have no resource events, the user is at least CREATEd
+      true
+    }
+    else {
+      false
+    }
+  }
+
+  def createUserStateBootstrap(imEvent: IMEventMsg) {
+    assert(MessageHelpers.isIMEventCreate(imEvent), "MessageHelpers.isIMEventCreate(imEvent)")
+    assert(this._userCreationIMEvent == imEvent, "this._userCreationIMEvent == imEvent")
 
-    initializeStateOfIMEvents()
-    initializeStateOfResourceEvents()
+    this._userStateBootstrap = aquarium.getUserStateBootstrap(this._userCreationIMEvent)
   }
 
   /**
-   * Process [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s.
-   * When this method is called, we assume that all proper checks have been made and it
-   * is OK to proceed with the event processing.
+   * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
+   * messaging hub (rabbitmq).
    */
   def onIMEventMsg(imEvent: IMEventMsg) {
-    if(this._isFirstMessage) {
+    if(!haveAgreements) {
+      // If we have no agreements so far, then it does not matter what kind of event
+      // this is. So we replay the log (ehm.. store)
       initializeUserActorState(imEvent.getUserID)
-      // we ignore this event, since it is already saved in the store and all messages in
-      // the store have been consulted by initializeUserActorState()
-      this._isFirstMessage = false
+
       return
     }
 
-    val hadUserCreationIMEvent = haveUserCreationIMEvent
+    // Check for out of sync (regarding IMEvents)
+    val isOutOfSyncIM = imEvent.getOccurredMillis < this._latestIMEventOccurredMillis
+    if(isOutOfSyncIM) {
+      // clear all resource state
+      // FIXME implement
 
-    if(!haveAgreements) {
-      initializeStateOfIMEvents()
-    }
-    else {
-      updateAgreementHistoryFrom(imEvent)
-      updateLatestIMEventIDFrom(imEvent)
+      return
     }
 
-    // Must also update user state if we know when in history the life of a user begins
-    if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
-      INFO("Processing user state, since we had a CREATE IMEvent")
-      loadUserStateAndUpdateAgreementHistory()
+    // Check out of sync (regarding ResourceEvents)
+    val isOutOfSyncRC = false // FIXME implement
+    if(isOutOfSyncRC) {
+      // TODO
+
+      return
     }
 
-    logSeparator()
+    // OK, seems good
+    assert(!MessageHelpers.isIMEventCreate(imEvent), "!MessageHelpers.isIMEventCreate(imEvent)")
+    updateAgreementHistoryFrom(imEvent)
   }
 
   def onResourceEventMsg(rcEvent: ResourceEventMsg) {
-    if(this._isFirstMessage) {
-      initializeUserActorState(rcEvent.getUserID)
-      // we ignore this event, since it is already saved in the store and all messages in
-      // the store have been consulted by initializeUserActorState()
-      this._isFirstMessage = false
-      return
-    }
-
-    if(!shouldProcessResourceEvents) {
-      // This means the user has not been created (at least, as far as Aquarium is concerned).
-      // So, we do not process any resource event
-      DEBUG("Not processing %s", AvroHelpers.jsonStringOfSpecificRecord(rcEvent))
-      logSeparator()
+    if(!haveAgreements) {
+      DEBUG("No agreement. Ignoring %s", rcEvent)
 
       return
     }
@@ -370,15 +386,15 @@ class UserActor extends ReflectiveRoleableActor {
     }
     else {
       DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s  and rcEvent.occurredMillis=%s",
-                TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis),
-                TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis))
+        TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis),
+        TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis))
 
       computeBatch()
     }
     val newTotalCredits = this._userState.totalCredits
     if(oldTotalCredits * newTotalCredits < 0)
       aquarium.eventBus ! new BalanceEvent(this._userState.userID,
-                                           newTotalCredits>=0)
+        newTotalCredits>=0)
     DEBUG("Updated %s", this._userState)
     logSeparator()
   }
@@ -399,15 +415,15 @@ class UserActor extends ReflectiveRoleableActor {
       sender ! GetUserBillResponse(Right(billData))
     } catch {
       case e:Exception =>
-       e.printStackTrace()
-       sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
+        e.printStackTrace()
+        sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
     }
   }
 
   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
     val userID = event.userID
 
-    (haveUserCreationIMEvent, haveUserState) match {
+    (haveAgreements, haveUserState) match {
       case (true, true) ⇒
         // (User CREATEd, with balance state)
         val realtimeMillis = TimeHelpers.nowMillis()
@@ -427,7 +443,7 @@ class UserActor extends ReflectiveRoleableActor {
             GetUserBalanceResponseData(
               this._userID,
               aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis)
-        )))
+            )))
 
       case (false, true) ⇒
         // (Not CREATEd, with balance state)
@@ -475,20 +491,20 @@ class UserActor extends ReflectiveRoleableActor {
               this._userID,
               this._userState.totalCredits,
               MessageFactory.newWalletEntriesMsg(this._userState.msg.getWalletEntries)
-        )))
+            )))
 
       case false ⇒
         DEBUG("!haveWorkingUserState: %s", event)
-        haveUserCreationIMEvent match {
+        haveAgreements match {
           case true ⇒
-            DEBUG("haveUserCreationIMEvent: %s", event)
+            DEBUG("haveAgreements: %s", event)
             sender ! GetUserWalletResponse(
               Right(
                 GetUserWalletResponseData(
                   this._userID,
                   aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis),
                   MessageFactory.newWalletEntriesMsg()
-            )))
+                )))
 
           case false ⇒
             DEBUG("!haveUserCreationIMEvent: %s", event)
index 3eb7610..f708bd5 100644 (file)
@@ -44,10 +44,10 @@ import java.util.concurrent.atomic.AtomicBoolean
 import com.google.common.cache.{CacheStats, RemovalNotification, RemovalListener, CacheBuilder, Cache}
 import com.ckkloverdos.props.{Props ⇒ KKProps}
 import gr.grnet.aquarium.actor.service.user.UserActor
-import gr.grnet.aquarium.util.date.TimeHelpers
 import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Callable}
 import akka.dispatch.{Await, Future}
 import akka.util.Duration
+import gr.grnet.aquarium.actor.message.config.SetUserActorUserID
 
 /**
  * A wrapper around Akka, so that it is uniformly treated as an Aquarium service.
@@ -231,6 +231,7 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif
         // Cache it for subsequent calls
         _userActorCache.put(userID, actorRef)
 
+        actorRef ! SetUserActorUserID(userID)
         actorRef
       }
     })
index 479f1ef..8a36cc9 100644 (file)
@@ -67,5 +67,5 @@ trait IMEventStore {
    *
    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
    */
-  def foreachIMEventInOccurrenceOrder(userID: String)(f: IMEventMsg ⇒ Unit): Unit
+  def foreachIMEventInOccurrenceOrder(userID: String)(f: IMEventMsg ⇒ Boolean): Boolean
 }
\ No newline at end of file
index 301db6d..2330c45 100644 (file)
@@ -185,12 +185,14 @@ extends StoreProvider
    *
    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
    */
-  def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Unit) = {
+  def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = {
+    var _shouldContinue = true
     for {
-      msg <- _imEvents
+      msg <- _imEvents if _shouldContinue
     } {
-      f(msg)
+      _shouldContinue = f(msg)
     }
+    _shouldContinue
   }
   //- IMEventStore
 
index 31dd626..6b46112 100644 (file)
@@ -284,19 +284,22 @@ class MongoDBStore(
    *
    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
    */
-  def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Unit) = {
+  def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = {
     val query = new BasicDBObject(MongoDBStore.JsonNames.userID, userID)
     val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1))
 
+    var _shouldContinue = true
     withCloseable(cursor) { cursor ⇒
-      while(cursor.hasNext) {
+      while(_shouldContinue && cursor.hasNext) {
         val dbObject = cursor.next()
         val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
         val msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
 
-        f(msg)
+        _shouldContinue = f(msg)
       }
     }
+
+    _shouldContinue
   }
   //-IMEventStore