WIP: IMEventModel end-to-end chain
authorChristos KK Loverdos <loverdos@gmail.com>
Thu, 3 May 2012 12:57:45 +0000 (15:57 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Thu, 3 May 2012 12:57:45 +0000 (15:57 +0300)
Keeping IMState locally in the UserActor.

src/main/resources/logback.xml
src/main/scala/gr/grnet/aquarium/actor/ActorRole.scala
src/main/scala/gr/grnet/aquarium/actor/message/service/router/RouterMessage.scala
src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala
src/main/scala/gr/grnet/aquarium/user/UserState.scala
src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala
src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala

index 95634fe..316318c 100644 (file)
@@ -24,7 +24,7 @@
 
   <logger name="cc.spray.can" level="INFO"/>
 
-  <logger name="gr.grnet" level="INFO"/>
+  <logger name="gr.grnet" level="DEBUG"/>
 
   <root level="DEBUG">
     <appender-ref ref="FILE"/>
index 858f776..15cb372 100644 (file)
@@ -121,8 +121,7 @@ case object UserActorRole
     extends ActorRole("UserActorRole",
                       false,
                       classOf[UserActor],
-                      Set(classOf[ProcessSetUserID],
-                          classOf[ProcessResourceEvent],
+                      Set(classOf[ProcessResourceEvent],
                           classOf[ProcessIMEvent],
                           classOf[RequestUserBalance],
                           classOf[UserRequestGetState]),
index 3c41427..9d3a403 100644 (file)
@@ -102,6 +102,4 @@ case class ProcessResourceEvent(rcEvent: ResourceEventModel) extends RouterMessa
  */
 case class ProcessIMEvent(imEvent: IMEventModel) extends RouterMessage
 
-case class ProcessSetUserID(userID: String) extends RouterMessage
-
 case class AdminRequestPingAll() extends RouterMessage
index 5d7f780..9fe11cf 100644 (file)
@@ -60,7 +60,6 @@ class RouterActor extends ReflectiveRoleableActor {
     val userActor = _actorProvider.actorForRole(UserActorRole)
     UserActorCache.put(userID, userActor)
     UserActorSupervisor.supervisor.link(userActor)
-    userActor ! ProcessSetUserID(userID)
 
     userActor
   }
index 2facf1f..67c4abb 100644 (file)
@@ -41,12 +41,12 @@ import gr.grnet.aquarium.actor._
 import gr.grnet.aquarium.user._
 
 import gr.grnet.aquarium.util.shortClassNameOf
-import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.actor.message.service.router._
 import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
 import gr.grnet.aquarium.event.im.IMEventModel
 import akka.config.Supervision.Temporary
 import gr.grnet.aquarium.{AquariumInternalError, AquariumException, Configurator}
+import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
 
 
 /**
@@ -55,14 +55,15 @@ import gr.grnet.aquarium.{AquariumInternalError, AquariumException, Configurator
  */
 
 class UserActor extends ReflectiveRoleableActor {
-  private[this] var _userID: String = _
+  private[this] var _imState: IMStateSnapshot = _
   private[this] var _userState: UserState = _
 
   self.lifeCycle = Temporary
 
+  private[this] def _userID = this._userState.userID
   private[this] def _shutmedown(): Unit = {
-    if(_haveFullState) {
-      UserActorCache.invalidate(this._userID)
+    if(_haveUserState) {
+      UserActorCache.invalidate(_userID)
     }
 
     self.stop()
@@ -78,21 +79,19 @@ class UserActor extends ReflectiveRoleableActor {
   def role = UserActorRole
 
   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
-//  private[this] def _userId = _userState.userId
 
   private[this] def _timestampTheshold =
     _configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
 
 
-  private[this] def _haveFullState = {
-    (this._userID ne null) && (this._userState ne null)
+  private[this] def _haveUserState = {
+    this._userState ne null
   }
 
-  private[this] def _havePartialState = {
-    (this._userID ne null) && (this._userState eq null)
+  private[this] def _haveIMState = {
+    this._imState ne null
   }
 
-
   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
   }
 
@@ -104,103 +103,30 @@ class UserActor extends ReflectiveRoleableActor {
     "default"
   }
 
-  private[this] def processCreateUser(imEvent: IMEventModel): Unit = {
-    this._userID = imEvent.userID
-
-    val store = _configurator.storeProvider.userStateStore
-    // try find user state. normally should ot exist
-    val latestUserStateOpt = store.findLatestUserStateByUserID(this._userID)
-    if(latestUserStateOpt.isDefined) {
-      logger.error("Got %s(%s, %s) but user already exists. Ingoring".format(
-        this._userID,
-        shortClassNameOf(imEvent),
-        imEvent.eventType))
-
-      return
-    }
-
-    val initialAgreementName = _getAgreementNameForNewUser(imEvent)
-    val newUserState    = DefaultUserStateComputations.createInitialUserState(
-      this._userID,
-      imEvent.occurredMillis,
-      imEvent.isActive,
-      0.0,
-      List(imEvent.role),
-      initialAgreementName)
-
-    this._userState = newUserState
-
-    // FIXME: If this fails, then the actor must be shut down.
-    store.insertUserState(newUserState)
-  }
-
-  private[this] def processModifyUser(imEvent: IMEventModel): Unit = {
-    val now = TimeHelpers.nowMillis()
-
-    if(!_haveFullState) {
-      ERROR("Got %s(%s) but have no state. Shutting down", shortClassNameOf(imEvent), imEvent.eventType)
-      _shutmedown()
-      return
-    }
-
-    this._userState = this._userState.modifyFromIMEvent(imEvent, now)
-  }
-
-  def onProcessSetUserID(event: ProcessSetUserID): Unit = {
-    this._userID = event.userID
-  }
-
   def onProcessIMEvent(event: ProcessIMEvent): Unit = {
     val now = TimeHelpers.nowMillis()
 
     val imEvent = event.imEvent
-    // If we already have a userID but it does not match the incoming userID, then this is an internal error
-    if(_havePartialState && (this._userID != imEvent.userID)) {
-      throw new AquariumInternalError(
-        "Got userID = %s but already have userID = %s".format(imEvent.userID, this._userID))
-    }
-
-    // If we get an IMEvent without having a user state, then we query for the latest user state.
-    if(!_haveFullState) {
-      val userStateOpt = _configurator.userStateStore.findLatestUserStateByUserID(this._userID)
-      this._userState = userStateOpt match {
-        case Some(userState) ⇒
-          userState
-
-        case None ⇒
-          val initialAgreementName = _getAgreementNameForNewUser(imEvent)
-          val initialUserState = DefaultUserStateComputations.createInitialUserState(
-            this._userID,
-            imEvent.occurredMillis,
-            imEvent.isActive,
-            0.0,
-            List(imEvent.role),
-            initialAgreementName)
-
-          DEBUG("Got initial state")
-          initialUserState
-      }
-    }
+    val isUpdate = if(_haveIMState) {
+      val newOccurredMillis = imEvent.occurredMillis
+      val currentOccurredMillis = this._imState.imEvent.occurredMillis
 
-    if(imEvent.isModifyUser && this._userState.isInitial) {
-      INFO("Got a '%s' but have not received '%s' yet", imEvent.eventType, IMEventModel.EventTypeNames.create)
-      return
-    }
-
-    if(imEvent.isCreateUser && !this._userState.isInitial) {
-      INFO("Got a '%s' but my state is not initial", imEvent.eventType)
-      return
-    }
+      if(newOccurredMillis < currentOccurredMillis) {
+        INFO(
+          "Ignoring older IMEvent: [%s] < [%s]",
+          new MutableDateCalc(newOccurredMillis).toYYYYMMDDHHMMSSSSS,
+          new MutableDateCalc(currentOccurredMillis).toYYYYMMDDHHMMSSSSS)
 
-    this._userState = this._userState.modifyFromIMEvent(imEvent, now)
+        return
+      }
 
-    if(imEvent.isCreateUser) {
-      processCreateUser(imEvent)
-    } else if(imEvent.isModifyUser) {
-      processModifyUser(imEvent)
+      true
     } else {
-      throw new AquariumException("Cannot interpret %s".format(imEvent))
+      false
     }
+
+    this._imState = IMStateSnapshot(imEvent, now)
+    DEBUG("%s %s", if(isUpdate) "Update" else "Set", shortClassNameOf(this._imState))
   }
 
   def onRequestUserBalance(event: RequestUserBalance): Unit = {
@@ -220,13 +146,13 @@ class UserActor extends ReflectiveRoleableActor {
 
 
   private[this] def D_userID = {
-    if(this._userID eq null)
-      "<NOT INITIALIZED>" // We always get a userID first
-    else
-      if(this._userState eq null)
-        "%s, NO STATE".format(this._userID)
+    if(this._userState eq null)
+      if(this._imState eq null)
+        "<NOT INITIALIZED>"
       else
-        "%s".format(this._userID)
+        this._imState.imEvent.userID
+    else
+      this._userState.userID
   }
 
   private[this] def DEBUG(fmt: String, args: Any*) =
@@ -242,5 +168,5 @@ class UserActor extends ReflectiveRoleableActor {
     logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
 
   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
-      logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
+    logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
 }
index e34c64d..55753ab 100644 (file)
@@ -45,6 +45,7 @@ import logic.accounting.dsl.{Timeslot, DSLAgreement}
 import collection.immutable.{TreeMap, SortedMap}
 import util.date.MutableDateCalc
 import event.resource.ResourceEventModel
+import gr.grnet.aquarium.event.im.IMEventModel
 
 /**
  * Snapshot of data that are user-related.
@@ -54,7 +55,9 @@ import event.resource.ResourceEventModel
 
 case class CreditSnapshot(creditAmount: Double, snapshotTime: Long) extends DataSnapshot
 
-case class RolesSnapshot(roles: List[String], snapshotTime: Long) extends DataSnapshot
+case class IMStateSnapshot(imEvent: IMEventModel, snapshotTime: Long) extends DataSnapshot
+
+//case class RolesSnapshot(roles: List[String], snapshotTime: Long) extends DataSnapshot
 
 /**
  * Represents an agreement valid for a specific amount of time. By convention,
@@ -132,11 +135,11 @@ case class AgreementSnapshot(agreements: List[Agreement], snapshotTime: Long) ex
  *  - If the resource is complex, the (name, instanceId) is (DSLResource.name, instance-id)
  *  - If the resource is simple,  the (name, instanceId) is (DSLResource.name, "1")
  *
- * @param resource        Same as `resource` of [[gr.grnet.aquarium.event.ResourceEvent]]
- * @param instanceId      Same as `instanceId` of [[gr.grnet.aquarium.event.ResourceEvent]]
+ * @param resource        Same as `resource` of [[gr.grnet.aquarium.event.resource.ResourceEventModel]]
+ * @param instanceId      Same as `instanceId` of [[gr.grnet.aquarium.event.resource.ResourceEventModel]]
  * @param instanceAmount  This is the amount kept for the resource instance.
 *                         The general rule is that an amount saved in a [[gr.grnet.aquarium.user.ResourceInstanceSnapshot]]
- *                        represents a total value, while a value appearing in a [[gr.grnet.aquarium.event.ResourceEvent]]
+ *                        represents a total value, while a value appearing in a [[gr.grnet.aquarium.event.resource.ResourceEventModel]]
  *                        represents a difference. How these two values are combined to form the new amount is dictated
  *                        by the underlying [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
  * @param snapshotTime
@@ -240,7 +243,7 @@ class DataSnapshotException(msg: String) extends Exception(msg)
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
-case class ActiveStateSnapshot(isActive: Boolean, snapshotTime: Long) extends DataSnapshot
+//case class ActiveStateSnapshot(isActive: Boolean, snapshotTime: Long) extends DataSnapshot
 
 /**
  * Keeps the latest resource event per resource instance.
index c645798..9e0ad26 100644 (file)
@@ -75,10 +75,8 @@ import org.bson.types.ObjectId
  * @param latestResourceEventsSnapshot
  * @param billingPeriodResourceEventsCounter
  * @param billingPeriodOutOfSyncResourceEventsCounter
- * @param activeStateSnapshot
  * @param creditsSnapshot
  * @param agreementsSnapshot
- * @param rolesSnapshot
  * @param ownedResourcesSnapshot
  * @param newWalletEntries
  *          The wallet entries computed. Not all user states need to holds wallet entries,
@@ -157,11 +155,9 @@ case class UserState(
      * the billing period recorded by `billingPeriodSnapshot`
      */
     billingPeriodOutOfSyncResourceEventsCounter: Long,
-
-    activeStateSnapshot: ActiveStateSnapshot,
+    imStateSnapshot: IMStateSnapshot,
     creditsSnapshot: CreditSnapshot,
     agreementsSnapshot: AgreementSnapshot,
-    rolesSnapshot: RolesSnapshot,
     ownedResourcesSnapshot: OwnedResourcesSnapshot,
     newWalletEntries: List[NewWalletEntry],
     // The last known change reason for this userState
@@ -175,8 +171,8 @@ case class UserState(
 
   private[this] def _allSnapshots: List[Long] = {
     List(
-      activeStateSnapshot.snapshotTime,
-      creditsSnapshot.snapshotTime, agreementsSnapshot.snapshotTime, rolesSnapshot.snapshotTime,
+      imStateSnapshot.snapshotTime,
+      creditsSnapshot.snapshotTime, agreementsSnapshot.snapshotTime,
       ownedResourcesSnapshot.snapshotTime,
       implicitlyIssuedSnapshot.snapshotTime,
       latestResourceEventsSnapshot.snapshotTime
@@ -232,12 +228,10 @@ case class UserState(
   def resourcesMap = ownedResourcesSnapshot.toResourcesMap
 
   def modifyFromIMEvent(imEvent: IMEventModel, snapshotMillis: Long): UserState = {
-    val changeReason = IMEventArrival(imEvent)
     this.copy(
       isInitial = false,
-      activeStateSnapshot = ActiveStateSnapshot(imEvent.isActive, snapshotMillis),
-      rolesSnapshot = RolesSnapshot(List(imEvent.role), snapshotMillis),
-      lastChangeReason = changeReason
+      imStateSnapshot = IMStateSnapshot(imEvent, snapshotMillis),
+      lastChangeReason = IMEventArrival(imEvent)
     )
   }
 
index e00a37e..24fa20b 100644 (file)
@@ -44,16 +44,50 @@ import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap}
 import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
 import gr.grnet.aquarium.logic.accounting.Accounting
 import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
-import gr.grnet.aquarium.AquariumException
 import gr.grnet.aquarium.event.{NewWalletEntry}
 import gr.grnet.aquarium.event.resource.ResourceEventModel
+import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent}
+import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion.User
+import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
 
 /**
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 class UserStateComputations extends Loggable {
-  def createInitialUserState(userId: String,
+  def createInitialUserState(imEvent: IMEventModel, credits: Double, agreementName: String) = {
+    if(!imEvent.isCreateUser) {
+      throw new AquariumInternalError(
+        "Got '%s' instead of '%s'".format(imEvent.eventType, IMEventModel.EventTypeNames.create))
+    }
+
+    val userID = imEvent.userID
+    val userCreationMillis = imEvent.occurredMillis
+    val now = TimeHelpers.nowMillis()
+
+    UserState(
+      true,
+      userID,
+      userCreationMillis,
+      0L,
+      false,
+      null,
+      ImplicitlyIssuedResourceEventsSnapshot(List(), now),
+      Nil,
+      Nil,
+      LatestResourceEventsSnapshot(List(), now),
+      0L,
+      0L,
+      IMStateSnapshot(imEvent, now),
+      CreditSnapshot(credits, now),
+      AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
+      OwnedResourcesSnapshot(Nil, now),
+      Nil,
+      InitialUserStateSetup
+    )
+  }
+
+  def createInitialUserState(userID: String,
                              userCreationMillis: Long,
                              isActive: Boolean,
                              credits: Double,
@@ -63,7 +97,7 @@ class UserStateComputations extends Loggable {
 
     UserState(
       true,
-      userId,
+      userID,
       userCreationMillis,
       0L,
       false,
@@ -74,10 +108,18 @@ class UserStateComputations extends Loggable {
       LatestResourceEventsSnapshot(List(), now),
       0L,
       0L,
-      ActiveStateSnapshot(isActive, now),
+      IMStateSnapshot(
+        StdIMEvent(
+          "",
+          now, now, userID,
+          "",
+          isActive, roleNames.headOption.getOrElse("default"),
+          "1.0",
+          IMEventModel.EventTypeNames.create, Map()),
+        now
+      ),
       CreditSnapshot(credits, now),
       AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
-      RolesSnapshot(roleNames, now),
       OwnedResourcesSnapshot(Nil, now),
       Nil,
       InitialUserStateSetup
@@ -86,13 +128,9 @@ class UserStateComputations extends Loggable {
 
   def createInitialUserStateFrom(us: UserState): UserState = {
     createInitialUserState(
-      us.userID,
-      us.userCreationMillis,
-      us.activeStateSnapshot.isActive,
+      us.imStateSnapshot.imEvent,
       us.creditsSnapshot.creditAmount,
-      us.rolesSnapshot.roles,
-      us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last
-    )
+      us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last)
   }
 
   def findUserStateAtEndOfBillingMonth(userId: String,
index d318056..2edbf14 100644 (file)
@@ -241,7 +241,7 @@ aquariumpolicy:
   val UserCKKL  = Aquarium.newUser("CKKL", UserCreationDate)
 
   val InitialUserState = Computations.createInitialUserState(
-    userId = UserCKKL.userId,
+    userID = UserCKKL.userId,
     userCreationMillis = UserCreationDate.getTime,
     isActive = true,
     credits = 0.0,