More complete handling of IM events
authorChristos KK Loverdos <loverdos@gmail.com>
Fri, 25 May 2012 14:08:16 +0000 (17:08 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Fri, 25 May 2012 14:08:16 +0000 (17:08 +0300)
12 files changed:
src/main/scala/gr/grnet/aquarium/actor/ActorRole.scala
src/main/scala/gr/grnet/aquarium/actor/message/config/InitializeUserState.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/computation/data/IMStateSnapshot.scala
src/main/scala/gr/grnet/aquarium/computation/data/RoleHistory.scala
src/main/scala/gr/grnet/aquarium/computation/data/RoleHistoryItem.scala
src/main/scala/gr/grnet/aquarium/event/model/ExternalEventModel.scala
src/main/scala/gr/grnet/aquarium/event/model/im/IMEventModel.scala
src/main/scala/gr/grnet/aquarium/store/IMEventStore.scala
src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala

index 0ddb187..ed8d2ac 100644 (file)
@@ -38,11 +38,11 @@ import service.router.RouterActor
 import service.pinger.PingerActor
 import service.rest.RESTActor
 import service.user.{UserActor}
-import message.config.{AquariumPropertiesLoaded, ActorProviderConfigured, ActorConfigurationMessage}
 import cc.spray.can.{Timeout, RequestContext}
 import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
 import gr.grnet.aquarium.actor.message.admin.PingAllRequest
 import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest}
+import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded, ActorProviderConfigured, ActorConfigurationMessage}
 
 /**
  * Each actor within Aquarium plays one role.
@@ -127,5 +127,6 @@ case object UserActorRole
                           classOf[ProcessIMEvent],
                           classOf[GetUserBalanceRequest],
                           classOf[GetUserStateRequest]),
-                      Set(classOf[ActorProviderConfigured],
+                      Set(classOf[InitializeUserState],
+                          classOf[ActorProviderConfigured],
                           classOf[AquariumPropertiesLoaded]))
diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/config/InitializeUserState.scala b/src/main/scala/gr/grnet/aquarium/actor/message/config/InitializeUserState.scala
new file mode 100644 (file)
index 0000000..2943c0f
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.actor.message.config
+
+import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, ActorMessage}
+
+
+/**
+ * This is sent to a [[gr.grnet.aquarium.actor.service.user.UserActor]] in order to initialize its internal state.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+case class InitializeUserState(userID: String) extends ActorMessage with ActorConfigurationMessage
index 4851cfe..1accc85 100644 (file)
@@ -41,11 +41,11 @@ import gr.grnet.aquarium.util.shortClassNameOf
 import gr.grnet.aquarium.service.RoleableActorProviderService
 import akka.actor.ActorRef
 import user.{UserActorCache}
-import message.config.{AquariumPropertiesLoaded, ActorProviderConfigured}
 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
 import gr.grnet.aquarium.actor.message.admin.PingAllRequest
 import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest}
 import gr.grnet.aquarium.{AquariumException, AquariumInternalError}
+import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded, ActorProviderConfigured}
 
 /**
  * Business logic router. Incoming messages are routed to appropriate destinations. Replies are routed back
@@ -63,6 +63,8 @@ class RouterActor extends ReflectiveRoleableActor {
     val userActor = _actorProvider.actorForRole(UserActorRole)
     UserActorCache.put(userID, userActor)
 
+    userActor ! InitializeUserState(userID)
+
     userActor
   }
 
index 0065775..56f2695 100644 (file)
@@ -40,15 +40,14 @@ package user
 import gr.grnet.aquarium.actor._
 
 import gr.grnet.aquarium.util.shortClassNameOf
-import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
 import akka.config.Supervision.Temporary
 import gr.grnet.aquarium.Aquarium
-import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
-import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
+import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest}
 import gr.grnet.aquarium.computation.data.IMStateSnapshot
-import gr.grnet.aquarium.computation.UserState
 import gr.grnet.aquarium.event.model.im.IMEventModel
+import gr.grnet.aquarium.computation.NewUserState
+import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
 
 /**
  *
@@ -57,37 +56,38 @@ import gr.grnet.aquarium.event.model.im.IMEventModel
 
 class UserActor extends ReflectiveRoleableActor {
   private[this] var _imState: IMStateSnapshot = _
-  private[this] var _userState: UserState = _
+//  private[this] var _userState: UserState = _
+//  private[this] var _newUserState: NewUserState = _
 
   self.lifeCycle = Temporary
 
-  private[this] def _userID = this._userState.userID
+//  private[this] def _userID = this._newUserState.userID
   private[this] def _shutmedown(): Unit = {
-    if(_haveUserState) {
-      UserActorCache.invalidate(_userID)
-    }
+//    if(_haveUserState) {
+//      UserActorCache.invalidate(_userID)
+//    }
 
     self.stop()
   }
 
   override protected def onThrowable(t: Throwable, message: AnyRef) = {
     logChainOfCauses(t)
-    ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
+//    ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
 
     _shutmedown()
   }
 
   def role = UserActorRole
 
-  private[this] def _configurator: Aquarium = Aquarium.Instance
+  private[this] def aquarium: Aquarium = Aquarium.Instance
 
   private[this] def _timestampTheshold =
-    _configurator.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000)
+    aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000)
 
 
-  private[this] def _haveUserState = {
-    this._userState ne null
-  }
+//  private[this] def _haveUserState = {
+//    this._newUserState ne null
+//  }
 
   private[this] def _haveIMState = {
     this._imState ne null
@@ -99,6 +99,30 @@ class UserActor extends ReflectiveRoleableActor {
   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
   }
 
+  private[this] def reloadIMState(userID: String): Unit = {
+    val store = aquarium.imEventStore
+    store.replayIMEventsInOccurrenceOrder(userID) { imEvent ⇒
+      logger.debug("Replaying %s".format(imEvent))
+
+      val newState = this._imState match {
+        case null ⇒
+          IMStateSnapshot.initial(imEvent)
+
+        case currentState ⇒
+          currentState.copyWithEvent(imEvent)
+      }
+
+      this._imState = newState
+    }
+
+    logger.debug("Recomputed %s".format(this._imState))
+  }
+
+  def onInitializeUserState(event: InitializeUserState): Unit = {
+    logger.debug("Got %s".format(event))
+    reloadIMState(event.userID)
+  }
+
   private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = {
     // FIXME: Implement based on the role
     "default"
@@ -114,25 +138,32 @@ class UserActor extends ReflectiveRoleableActor {
     val hadIMState = _haveIMState
 
     if(hadIMState) {
-
-      this._imState = this._imState.addMostRecentEvent(imEvent)
+      if(this._imState.latestIMEvent.id == imEvent.id) {
+        // This happens when the actor is brought to life, then immediately initialized, and then
+        // sent the first IM event. But from the initialization procedure, this IM event will have
+        // already been loaded from DB!
+        logger.debug("Ignoring first %s after birth".format(imEvent.toDebugString))
+        return
+      }
+
+      this._imState = this._imState.copyWithEvent(imEvent)
     } else {
       this._imState = IMStateSnapshot.initial(imEvent)
     }
 
-    DEBUG("%s %s = %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState), this._imState)
+//    DEBUG("%s %s = %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState), this._imState)
   }
 
   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
     val userId = event.userID
     // FIXME: Implement
-    self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount))
+//    self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount))
   }
 
   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
     val userId = event.userID
    // FIXME: Implement
-    self reply GetUserStateResponse(userId, Right(this._userState))
+//    self reply GetUserStateResponse(userId, Right(this._userState))
   }
 
   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
@@ -142,28 +173,28 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
 
-  private[this] def D_userID = {
-    if(this._userState eq null)
-      if(this._imState eq null)
-        "<NOT INITIALIZED>"
-      else
-        this._imState.imEvent.userID
-    else
-      this._userState.userID
-  }
-
-  private[this] def DEBUG(fmt: String, args: Any*) =
-    logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-
-  private[this] def INFO(fmt: String, args: Any*) =
-    logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-
-  private[this] def WARN(fmt: String, args: Any*) =
-    logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-
-  private[this] def ERROR(fmt: String, args: Any*) =
-    logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-
-  private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
-    logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
+//  private[this] def D_userID = {
+//    if(this._newUserState eq null)
+//      if(this._imState eq null)
+//        "<NOT INITIALIZED>"
+//      else
+//        this._imState.latestIMEvent.userID
+//    else
+//      this._newUserState.userID
+//  }
+//
+//  private[this] def DEBUG(fmt: String, args: Any*) =
+//    logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
+//
+//  private[this] def INFO(fmt: String, args: Any*) =
+//    logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
+//
+//  private[this] def WARN(fmt: String, args: Any*) =
+//    logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
+//
+//  private[this] def ERROR(fmt: String, args: Any*) =
+//    logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
+//
+//  private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
+//    logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
 }
index c0c9c62..8b2642e 100644 (file)
@@ -57,10 +57,11 @@ case class IMStateSnapshot(
                             */
                            roleHistory: RoleHistory) {
 
-  def addMostRecentEvent(newEvent: IMEventModel) = {
+  def copyWithEvent(imEvent: IMEventModel) = {
     copy(
-      latestIMEvent = newEvent,
-      roleHistory = roleHistory.addMostRecentRole(newEvent.role, newEvent.occurredMillis)
+      hasBeenActivated = this.hasBeenActivated || imEvent.isActive,
+      latestIMEvent    = imEvent,
+      roleHistory      = this.roleHistory.copyWithRole(imEvent.role, imEvent.occurredMillis)
     )
   }
 }
index 5a710e6..fb97aab 100644 (file)
@@ -37,6 +37,8 @@ package gr.grnet.aquarium.computation.data
 
 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
 import scala.collection.immutable.{TreeMap, SortedMap}
+import scala.collection.mutable.ListBuffer
+import scala.annotation.tailrec
 
 /**
  *
@@ -52,20 +54,46 @@ case class RoleHistory(
     TreeMap(roles.map(role => (role.timeslot, role.name)): _*)
   }
 
-  /**
-   * Insert the new role in front of the other ones and adjust the `validTo` limit of the previous role.
-   */
-  def addMostRecentRole(role: String, validFrom: Long) = {
-    val newItem = RoleHistoryItem(role, validFrom)
-    val newRoles = roles match {
+  def copyWithRole(role: String, validFrom: Long) = {
+    val newItems = roles match {
+      case Nil ⇒
+        RoleHistoryItem(role, validFrom) :: Nil
+
       case head :: tail ⇒
-        newItem :: head.withNewValidTo(validFrom) :: tail
+        if(head.isStrictlyAfter(validFrom)) {
+          // must search history items to find where this fits in
+          @tailrec
+          def check(allChecked: ListBuffer[RoleHistoryItem],
+                    lastCheck: RoleHistoryItem,
+                    toCheck: List[RoleHistoryItem]): List[RoleHistoryItem] = {
 
-      case Nil ⇒
-        newItem :: Nil
+            toCheck match {
+              case Nil ⇒
+                allChecked.append(RoleHistoryItem(role, validFrom, lastCheck.validFrom))
+                allChecked.toList
+
+              case toCheckHead :: toCheckTail ⇒
+                if(toCheckHead.isStrictlyAfter(validFrom)) {
+                  allChecked.append(toCheckHead)
+
+                  check(allChecked, toCheckHead, toCheckTail)
+                } else {
+                  allChecked.append(RoleHistoryItem(role, validFrom, lastCheck.validFrom))
+                  allChecked.toList
+                }
+            }
+          }
+
+          val buffer = new ListBuffer[RoleHistoryItem]
+          buffer.append(head)
+          check(buffer, head, tail)
+        } else {
+          // assume head.validTo goes to infinity,
+          RoleHistoryItem(role, validFrom) :: head.copyWithValidTo(validFrom) :: tail
+        }
     }
 
-    RoleHistory(newRoles)
+    RoleHistory(newItems)
   }
 }
 
index afa6deb..c20100c 100644 (file)
@@ -37,6 +37,7 @@ package gr.grnet.aquarium.computation.data
 
 import gr.grnet.aquarium.util.date.MutableDateCalc
 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
+import gr.grnet.aquarium.Aquarium
 
 /**
  *
@@ -59,9 +60,16 @@ case class RoleHistoryItem(
      */
     validTo: Long = Long.MaxValue) {
 
-  require(
-    validFrom <= validTo,
-    "validFrom(%s) <= validTo(%s)".format(new MutableDateCalc(validFrom), new MutableDateCalc(validFrom)))
+  try {
+    require(
+      validFrom <= validTo,
+      "validFrom(%s) <= validTo(%s)".format(new MutableDateCalc(validFrom), new MutableDateCalc(validTo)))
+  }
+  catch {
+    case e: IllegalArgumentException ⇒
+      Aquarium.Instance.debug(this, "!! validFrom = %s, validTo = %s, dx=%s", validFrom, validTo, validTo-validFrom)
+      throw e
+  }
 
   require(name ne null, "Name is not null")
 
@@ -69,7 +77,19 @@ case class RoleHistoryItem(
 
   def timeslot = Timeslot(validFrom, validTo)
 
-  def withNewValidTo(newValidTo: Long) = copy(validTo = newValidTo)
+  def copyWithValidTo(newValidTo: Long) = copy(validTo = newValidTo)
+
+  def isUpperBounded = {
+    validTo != Long.MaxValue
+  }
+
+  def contains(time: Long) = {
+    validFrom <= time && time < validTo
+  }
+
+  def isStrictlyAfter(time: Long) = {
+    validFrom > time
+  }
 
   override def toString =
     "RoleHistoryItem(%s, [%s, %s))".
index 4530a7a..cb3f373 100644 (file)
@@ -39,6 +39,7 @@ import gr.grnet.aquarium.util.makeBytes
 import gr.grnet.aquarium.util.shortClassNameOf
 import gr.grnet.aquarium.util.json.JsonSupport
 import gr.grnet.aquarium.util.xml.XmlSupport
+import gr.grnet.aquarium.util.date.MutableDateCalc
 
 /**
  * The base model for all events coming from external systems.
@@ -59,7 +60,11 @@ trait ExternalEventModel extends EventModel with JsonSupport with XmlSupport {
 
   def withReceivedMillis(newReceivedMillis: Long): ExternalEventModel
 
-  def toDebugString = "%s(userID=%s, id=%s)".format(shortClassNameOf(this), userID, id)
+  def toDebugString = "%s(userID=%s, id=%s, occurred=%s)".format(
+    shortClassNameOf(this),
+    userID,
+    id,
+    new MutableDateCalc(occurredMillis).toString)
 }
 
 object ExternalEventModel {
index f6b709a..870efa0 100644 (file)
@@ -37,6 +37,7 @@ package gr.grnet.aquarium.event.model
 package im
 
 import gr.grnet.aquarium.util._
+import gr.grnet.aquarium.util.date.MutableDateCalc
 
 /**
  * The model of any event sent from the `Identity Management` (IM) external system.
@@ -64,7 +65,13 @@ trait IMEventModel extends ExternalEventModel {
   def isModifyUser = eventType.equalsIgnoreCase(IMEventModel.EventTypeNames.modify)
 
   override def toDebugString = {
-    "%s(userID=%s, id=%s, isActive=%s, role='%s')".format(shortClassNameOf(this), userID, id, isActive, role)
+    "%s(userID=%s, id=%s, isActive=%s, role='%s', occurred=%s)".format(
+      shortClassNameOf(this),
+      userID,
+      id,
+      isActive,
+      role,
+      new MutableDateCalc(occurredMillis).toString)
   }
 }
 
index bd459ac..62e5e62 100644 (file)
@@ -75,4 +75,12 @@ trait IMEventStore {
    *
    */
   def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent]
+
+  /**
+   * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
+   * the given function `f`.
+   *
+   * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
+   */
+  def replayIMEventsInOccurrenceOrder(userID: String)(f: IMEvent ⇒ Unit): Unit
 }
\ No newline at end of file
index 9726383..c23605a 100644 (file)
@@ -352,6 +352,18 @@ class MemStore extends UserStateStore
         None
     }
   }
+
+  /**
+   * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
+   * the given function `f`.
+   *
+   * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
+   */
+  def replayIMEventsInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
+    imEventById.valuesIterator.filter(_.userID == userID).toSeq.sortWith {
+      case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis
+    } foreach(f)
+  }
   //- IMEventStore
 
   def loadPolicyEntriesAfter(after: Long) =
index a8f517e..25981bd 100644 (file)
@@ -381,8 +381,28 @@ class MongoDBStore(
       }
    }
   }
+
+  /**
+   * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
+   * the given function `f`.
+   *
+   * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
+   */
+  def replayIMEventsInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
+    val query = new BasicDBObject(IMEventNames.userID, userID)
+    val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
+
+    withCloseable(cursor) { cursor ⇒
+      while(cursor.hasNext) {
+        val model = MongoDBIMEvent.fromDBObject(cursor.next())
+        f(model)
+      }
+    }
+  }
   //-IMEventStore
 
+
+
   //+PolicyStore
   def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = {
     val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom,