Being devops-friendly until a user creation event arrives
authorChristos KK Loverdos <loverdos@gmail.com>
Tue, 5 Jun 2012 07:36:10 +0000 (10:36 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Tue, 5 Jun 2012 07:36:10 +0000 (10:36 +0300)
15 files changed:
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/computation/UserState.scala
src/main/scala/gr/grnet/aquarium/computation/UserStateBootstrappingData.scala
src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala
src/main/scala/gr/grnet/aquarium/computation/data/IMStateSnapshot.scala
src/main/scala/gr/grnet/aquarium/computation/data/RoleHistory.scala
src/main/scala/gr/grnet/aquarium/computation/reason/UserStateChangeReason.scala
src/main/scala/gr/grnet/aquarium/connector/handler/IMEventPayloadHandler.scala
src/main/scala/gr/grnet/aquarium/connector/handler/ResourceEventPayloadHandler.scala
src/main/scala/gr/grnet/aquarium/converter/JsonConversions.scala
src/main/scala/gr/grnet/aquarium/event/model/im/IMEventModel.scala
src/main/scala/gr/grnet/aquarium/store/IMEventStore.scala
src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala
src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala

index 825589d..fcb5b4a 100644 (file)
@@ -48,8 +48,8 @@ import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.event.model.im.IMEventModel
 import gr.grnet.aquarium.{AquariumException, Aquarium}
 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
-import gr.grnet.aquarium.computation.reason.{InitialUserActorSetup, UserStateChangeReason, IMEventArrival, InitialUserStateSetup}
 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf, shortNameOfClass, shortNameOfType}
+import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, InitialUserActorSetup, UserStateChangeReason, IMEventArrival, InitialUserStateSetup}
 
 /**
  *
@@ -101,79 +101,74 @@ class UserActor extends ReflectiveRoleableActor {
   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
   }
 
-  private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel): (Boolean, Boolean, String) = {
+  private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel, roleCheck: Option[String]) = {
     if(haveIMState) {
-      val currentRole = this._imState.roleHistory.lastRole.map(_.name).getOrElse(null)
-//      logger.debug("Current role = %s".format(currentRole))
+      val (newState,
+           creationTimeChanged,
+           activationTimeChanged,
+           roleChanged) = this._imState.updatedWithEvent(imEvent, roleCheck)
 
-      if(imEvent.role != currentRole) {
-//        logger.debug("New role = %s".format(imEvent.role))
-        this._imState = this._imState.updateRoleHistoryWithEvent(imEvent)
-        (true, false, "")
-      } else {
-        val noUpdateReason = "Same role '%s'".format(currentRole)
-//        logger.debug(noUpdateReason)
-        (false, false, noUpdateReason)
-      }
+      this._imState = newState
+      (creationTimeChanged, activationTimeChanged, roleChanged)
     } else {
       this._imState = IMStateSnapshot.initial(imEvent)
-      (true, true, "")
+      (
+        imEvent.isCreateUser,
+        true, // first activation status is a change by default??
+        true  // first role is a change by default??
+        )
     }
   }
 
   /**
    * Creates the IMStateSnapshot and returns the number of updates it made to it.
    */
-  private[this] def createInitialIMState(event: InitializeUserState): Int = {
-    val userID = event.userID
+  private[this] def createInitialIMState(): Unit = {
     val store = aquarium.imEventStore
 
-    var _updateCount = 0
+    var _roleCheck = None: Option[String]
 
-    store.replayIMEventsInOccurrenceOrder(userID) { imEvent ⇒
+    // this._userID is already set up
+    store.replayIMEventsInOccurrenceOrder(this._userID) { imEvent ⇒
       DEBUG("Replaying %s", imEvent)
 
-      val (updated, firstUpdate, noUpdateReason) = _updateIMStateRoleHistory(imEvent)
-      if(updated) {
-        _updateCount = _updateCount + 1
-        DEBUG("Updated %s for role '%s'", shortNameOfType[IMStateSnapshot], imEvent.role)
-      } else {
-        DEBUG("Not updated %s due to: %s", shortNameOfType[IMStateSnapshot], noUpdateReason)
-      }
-    }
+      val (creationTimeChanged, activationTimeChanged, roleChanged) = _updateIMStateRoleHistory(imEvent, _roleCheck)
+      _roleCheck = this._imState.roleHistory.lastRoleName
 
-    if(_updateCount > 0)
-      DEBUG("Computed %s = %s", shortNameOfType[IMStateSnapshot], this._imState)
-    else
-      DEBUG("Not computed %s", shortNameOfType[IMStateSnapshot])
+      DEBUG(
+        "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
+        creationTimeChanged, activationTimeChanged, roleChanged,
+        imEvent
+      )
+    }
 
-    _updateCount
+    DEBUG("New %s = %s", shortNameOfType[IMStateSnapshot], this._imState)
   }
 
   /**
    * Resource events are processed only if the user has been activated.
    */
   private[this] def shouldProcessResourceEvents: Boolean = {
-    haveIMState && this._imState.hasBeenActivated
+    haveIMState && this._imState.hasBeenCreated
   }
 
   private[this] def loadUserStateAndUpdateRoleHistory(): Unit = {
-    val userActivationMillis = this._imState.userActivationMillis.get
+    val userCreationMillis = this._imState.userCreationMillis.get
     val initialRole = this._imState.roleHistory.firstRole.get.name
 
     val userStateBootstrap = UserStateBootstrappingData(
       this._userID,
-      userActivationMillis,
+      userCreationMillis,
       initialRole,
-      aquarium.initialAgreementForRole(initialRole, userActivationMillis),
-      aquarium.initialBalanceForRole(initialRole, userActivationMillis)
+      aquarium.initialAgreementForRole(initialRole, userCreationMillis),
+      aquarium.initialBalanceForRole(initialRole, userCreationMillis)
     )
 
     val userState = userStateComputations.doFullMonthlyBilling(
       userStateBootstrap,
       BillingMonthInfo.fromMillis(TimeHelpers.nowMillis()),
       aquarium.currentResourcesMap,
-      InitialUserStateSetup,
+      InitialUserStateSetup(),
       None
     )
 
@@ -182,7 +177,7 @@ class UserActor extends ReflectiveRoleableActor {
     // Final touch: Update role history
     if(haveIMState && haveUserState) {
       if(this._userState.roleHistory != this._imState.roleHistory) {
-        this._userState = newUserStateWithUpdatedRoleHistory(InitialUserActorSetup)
+        this._userState = newUserStateWithUpdatedRoleHistory(InitialUserActorSetup())
       }
     }
   }
@@ -194,9 +189,9 @@ class UserActor extends ReflectiveRoleableActor {
       return
     }
 
-    if(!this._imState.hasBeenActivated) {
+    if(!this._imState.hasBeenCreated) {
       // Cannot set the initial state!
-      DEBUG("Cannot create %s from %s, since user is inactive", shortNameOfType[UserState], event)
+      DEBUG("Cannot create %s from %s, since user has not been created", shortNameOfType[UserState], event)
       return
     }
 
@@ -210,11 +205,10 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   def onInitializeUserState(event: InitializeUserState): Unit = {
-    val userID = event.userID
-    this._userID = userID
+    this._userID = event.userID
     DEBUG("Got %s", event)
 
-    createInitialIMState(event)
+    createInitialIMState()
     createInitialUserState(event)
   }
 
@@ -254,26 +248,30 @@ class UserActor extends ReflectiveRoleableActor {
       return
     }
 
-    val (updated, firstUpdate, noUpdateReason) = _updateIMStateRoleHistory(imEvent)
+    val (creationTimeChanged,
+         activationTimeChanged,
+         roleChanged) = _updateIMStateRoleHistory(imEvent, this._imState.roleHistory.lastRoleName)
 
-    if(updated) {
-      DEBUG("Updated %s = %s", shortClassNameOf(this._imState), this._imState)
+    DEBUG(
+      "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
+      creationTimeChanged, activationTimeChanged, roleChanged,
+      imEvent
+    )
 
-      // Must also update user state
-      if(shouldProcessResourceEvents) {
-        if(!haveUserState) {
-          loadUserStateAndUpdateRoleHistory()
-          INFO("Loaded %s due to %s", shortNameOfType[UserState], imEvent)
-        } else {
-          // Just update role history
-          this._userState = newUserStateWithUpdatedRoleHistory(IMEventArrival(imEvent))
-          INFO("Updated %s due to %s", shortNameOfType[UserState], imEvent)
-        }
+    // Must also update user state if we know when in history the life of a user begins
+    if(creationTimeChanged) {
+      if(!haveUserState) {
+        loadUserStateAndUpdateRoleHistory()
+        INFO("Loaded %s due to %s", shortNameOfType[UserState], imEvent)
+      } else {
+        // Just update role history
+        this._userState = newUserStateWithUpdatedRoleHistory(IMEventArrival(imEvent))
+        INFO("Updated %s due to %s", shortNameOfType[UserState], imEvent)
       }
-    } else {
-      DEBUG("Not updating %s from %s due to: %s", shortNameOfType[IMStateSnapshot], imEvent, noUpdateReason)
     }
 
+    DEBUG("New %s = %s", shortNameOfType[IMStateSnapshot], this._imState)
+
     logSeparator()
   }
 
index 8b8c815..4a5b56c 100644 (file)
@@ -144,7 +144,7 @@ case class UserState(
     occurredMillis: Long, // The time fro which this state is relevant
 
     // The last known change reason for this userState
-    lastChangeReason: UserStateChangeReason = NoSpecificChangeReason,
+    lastChangeReason: UserStateChangeReason = NoSpecificChangeReason(),
     // The user state we used to compute this one. Normally the (cached)
     // state at the beginning of the billing period.
     parentUserStateId: Option[String] = None,
@@ -248,7 +248,7 @@ object UserState {
       OwnedResourcesSnapshot.Empty,
       Nil,
       userCreationMillis,
-      InitialUserStateSetup
+      InitialUserStateSetup()
     )
   }
 
index 54fc9dc..63adac2 100644 (file)
@@ -36,7 +36,7 @@
 package gr.grnet.aquarium.computation
 
 /**
- * Used to bootstrao the user state.
+ * This is used to bootstrap the user state.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
index 8649d95..a3a1d84 100644 (file)
@@ -69,7 +69,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
     val clog = ContextualLogger.fromOther(
       clogOpt,
       logger,
-      "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
+      "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
     clog.begin()
 
     def doCompute: UserState = {
@@ -84,8 +84,8 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
     val userID = userStateBootstrap.userID
     val userCreationMillis = userStateBootstrap.userCreationMillis
     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
-    val billingMonthStartMillis = billingMonthInfo.startMillis
-    val billingMonthStopMillis = billingMonthInfo.stopMillis
+    val billingMonthStartMillis = billingMonthInfo.monthStartMillis
+    val billingMonthStopMillis = billingMonthInfo.monthStopMillis
 
     if(billingMonthStopMillis < userCreationMillis) {
       // If the user did not exist for this billing month, piece of cake
@@ -323,7 +323,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
   def doFullMonthlyBilling(userStateBootstrap: UserStateBootstrappingData,
                            billingMonthInfo: BillingMonthInfo,
                            defaultResourcesMap: DSLResourcesMap,
-                           calculationReason: UserStateChangeReason = NoSpecificChangeReason,
+                           calculationReason: UserStateChangeReason,
                            clogOpt: Option[ContextualLogger] = None): UserState = {
 
     val userID = userStateBootstrap.userID
@@ -331,7 +331,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
     val clog = ContextualLogger.fromOther(
       clogOpt,
       logger,
-      "doFullMonthlyBilling(%s)", billingMonthInfo)
+      "doFullMonthlyBilling(%s)", billingMonthInfo.toShortDebugString)
     clog.begin()
 
     val clogSome = Some(clog)
@@ -340,15 +340,15 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
       userStateBootstrap,
       billingMonthInfo.previousMonth,
       defaultResourcesMap,
-      calculationReason.forPreviousBillingMonth,
+      calculationReason.forBillingMonthInfo(billingMonthInfo.previousMonth),
       clogSome
     )
 
     val startingUserState = previousBillingMonthUserState
 
 
-    val billingMonthStartMillis = billingMonthInfo.startMillis
-    val billingMonthEndMillis = billingMonthInfo.stopMillis
+    val billingMonthStartMillis = billingMonthInfo.monthStartMillis
+    val billingMonthEndMillis = billingMonthInfo.monthStopMillis
 
     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
     // specified in the parameters.
index 0ac8119..1b3d90e 100644 (file)
@@ -53,7 +53,12 @@ case class IMStateSnapshot(
                            /**
                             * The earliest activation time, if it exists.
                             */
-                           userActivationMillis: Option[Long],
+                           userEarliestActivationMillis: Option[Long],
+
+                           /**
+                            * The user creation time, if it exists
+                            */
+                           userCreationMillis: Option[Long],
 
                            /**
                             * This is the recorded role history
@@ -64,22 +69,101 @@ case class IMStateSnapshot(
    * True iff the user has ever been activated even once.
    */
   def hasBeenActivated: Boolean = {
-    userActivationMillis.isDefined
+    userEarliestActivationMillis.isDefined
+  }
+
+  def hasBeenCreated: Boolean = {
+    userCreationMillis.isDefined
+  }
+
+  /**
+   * Given the newly arrived event, we compute the updated user earliest activation time, if any.
+   * We always update activation time if it is earlier than the currently known activation time.
+   */
+  private[this] def updatedEarliestActivationTime(imEvent: IMEventModel): Option[Long] = {
+    this.userEarliestActivationMillis match {
+      case Some(activationMillis) if imEvent.isStateActive && activationMillis < imEvent.occurredMillis ⇒
+        Some(imEvent.occurredMillis)
+
+      case None if imEvent.isStateActive ⇒
+        Some(imEvent.occurredMillis)
+
+      case other ⇒
+        other
+    }
   }
 
-  def updateRoleHistoryWithEvent(imEvent: IMEventModel) = {
-    copy(
-      userActivationMillis = if(imEvent.isStateActive) Some(imEvent.occurredMillis) else this.userActivationMillis,
-      latestIMEvent = imEvent,
-      roleHistory   = this.roleHistory.updateWithRole(imEvent.role, imEvent.occurredMillis)
+  /**
+   * Given the newly arrived event, we compute the updated user creation time, if any.
+   * Only the first `create` event triggers an actual update.
+   */
+  private[this] def updatedCreationTime(imEvent: IMEventModel): Option[Long] = {
+    // Allow only the first `create` event
+    if(this.userCreationMillis.isDefined) {
+      this.userCreationMillis
+    } else if(imEvent.isCreateUser) {
+      Some(imEvent.occurredMillis)
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Given the newly arrived event, we compute the updated role history.
+   */
+  private[this] def updatedRoleHistory(imEvent: IMEventModel): RoleHistory = {
+    this.roleHistory.updatedWithRole(imEvent.role, imEvent.occurredMillis)
+  }
+
+  /**
+   * Computes an updated state and returns a tuple made of four elements:
+   * a) the updated state, b) a `Boolean` indicating whether the user creation
+   * time has changed, c) a `Boolean` indicating whether the user activation
+   * time has changed and d) a `Boolean` indicating whether the user
+   * role history has changed.
+   *
+   * The role history is updated only if the `roleCheck` is not `None` and
+   * the role it represents is different than the role of the `imEvent`.
+   * The motivation for `roleCheck` is to use this method in a loop (as in replaying
+   * events from the [[gr.grnet.aquarium.store.IMEventStore]]).
+   */
+  def updatedWithEvent(imEvent: IMEventModel,
+                       roleCheck: Option[String]): (IMStateSnapshot, Boolean, Boolean, Boolean) = {
+    // Things of interest that may change by the imEvent:
+    // - user creation time
+    // - user activation time
+    // - user role
+
+    val newCreationTime = updatedCreationTime(imEvent)
+    val creationTimeChanged = this.userCreationMillis != newCreationTime
+
+    val newActivationTime = updatedEarliestActivationTime(imEvent)
+    val activationTimeChanged = this.userEarliestActivationMillis != newActivationTime
+
+    val (roleChanged, newRoleHistory) = roleCheck match {
+      case Some(role) if role != imEvent.role ⇒
+        (true, updatedRoleHistory(imEvent))
+
+      case _ ⇒
+        (false, this.roleHistory)
+    }
+
+    val newState = this.copy(
+      latestIMEvent      = imEvent,
+      userCreationMillis = newCreationTime,
+      userEarliestActivationMillis = newActivationTime,
+      roleHistory = newRoleHistory
     )
+
+    (newState, creationTimeChanged, activationTimeChanged, roleChanged)
   }
 
   override def toString = {
-    "%s(\n!! %s\n!! %s\n!! %s)".format(
+    "%s(\n!! %s\n!! %s\n!! %s\n!! %s)".format(
       shortClassNameOf(this),
       latestIMEvent,
-      userActivationMillis.map(new MutableDateCalc(_)),
+      userCreationMillis.map(new MutableDateCalc(_)),
+      userEarliestActivationMillis.map(new MutableDateCalc(_)),
       roleHistory
     )
   }
@@ -90,6 +174,7 @@ object IMStateSnapshot {
     IMStateSnapshot(
       imEvent,
       if(imEvent.isStateActive) Some(imEvent.occurredMillis) else None,
+      if(imEvent.isCreateUser) Some(imEvent.occurredMillis) else None,
       RoleHistory.initial(imEvent.role, imEvent.occurredMillis))
   }
 }
index 59b2be9..fdee408 100644 (file)
@@ -59,7 +59,7 @@ case class RoleHistory(
     TreeMap(roles.map(role ⇒ (role.timeslot, role)): _*)
   }
 
-  def updateWithRole(role: String, validFrom: Long) = {
+  def updatedWithRole(role: String, validFrom: Long) = {
     // TODO: Review this when Timeslot is also reviewed.
     //       Currently, we need `fixValidTo` because Timeslot does not validate when `validFrom` and `validTo`
     //       are equal.
index e457199..37eb501 100644 (file)
@@ -37,8 +37,74 @@ package gr.grnet.aquarium.computation.reason
 
 import gr.grnet.aquarium.computation.BillingMonthInfo
 import gr.grnet.aquarium.event.model.im.IMEventModel
+import gr.grnet.aquarium.util.shortClassNameOf
 
-sealed trait UserStateChangeReason {
+/**
+ * Provides information explaining the reason Aquarium calculated a new [[gr.grnet.aquarium.computation.UserState]].
+ */
+case class UserStateChangeReason(
+    parentReason: Option[UserStateChangeReason],
+    billingMonthInfo: Option[BillingMonthInfo],
+    details: Map[String, Any]
+) {
+
+  require(
+    details.contains(UserStateChangeReason.Names.`type`),
+    "No type present in the details of %s".format(shortClassNameOf(this))
+  )
+
+  private[this] def booleanFromDetails(name: String, default: Boolean) = {
+    details.get(name) match {
+      case Some(value: Boolean) ⇒
+        value
+
+      case _ ⇒
+        false
+    }
+  }
+
+   /**
+    * Return `true` if the result of the calculation should be stored back to the
+    * [[gr.grnet.aquarium.store.UserStateStore]].
+    *
+    */
+  def shouldStoreUserState: Boolean =
+     booleanFromDetails(UserStateChangeReason.Names.shouldStoreUserState, false)
+
+  def shouldStoreCalculatedWalletEntries: Boolean =
+    booleanFromDetails(UserStateChangeReason.Names.shouldStoreCalculatedWalletEntries, false)
+
+  def calculateCreditsForImplicitlyTerminated: Boolean =
+    booleanFromDetails(UserStateChangeReason.Names.calculateCreditsForImplicitlyTerminated, false)
+
+  def forBillingMonthInfo(bmi: BillingMonthInfo) = {
+    copy(
+      parentReason = Some(this),
+      billingMonthInfo = Some(bmi)
+    )
+  }
+
+  def `type`: String = {
+    // This must be always present
+    details(UserStateChangeReason.Names.`type`).asInstanceOf[String]
+  }
+}
+
+object UserStateChangeReason {
+  object Names {
+    final val `type` = "type"
+
+    final val imEvent = "imEvent"
+    final val forWhenMillis = "forWhenMillis"
+
+    final val shouldStoreUserState = "shouldStoreUserState"
+    final val shouldStoreCalculatedWalletEntries = "shouldStoreCalculatedWalletEntries"
+    final val calculateCreditsForImplicitlyTerminated = "calculateCreditsForImplicitlyTerminated"
+  }
+}
+
+sealed trait UserStateChangeReason_ {
+  def originalReason: UserStateChangeReason_
   /**
     * Return `true` if the result of the calculation should be stored back to the
     * [[gr.grnet.aquarium.store.UserStateStore]].
@@ -48,104 +114,116 @@ sealed trait UserStateChangeReason {
 
   def shouldStoreCalculatedWalletEntries: Boolean
 
-  def forPreviousBillingMonth: UserStateChangeReason
+  def forPreviousBillingMonth: UserStateChangeReason_
 
   def calculateCreditsForImplicitlyTerminated: Boolean
 
   def code: UserStateChangeReasonCodes.ChangeReasonCode
 }
 
-/**
- * When the user state is initially set up.
- */
-case object InitialUserStateSetup extends UserStateChangeReason {
-  def shouldStoreUserState = true
-
-  def shouldStoreCalculatedWalletEntries = false
-
-  def forPreviousBillingMonth = this
+object InitialUserStateSetup {
+  def `type` = "InitialUserStateSetup"
 
-  def calculateCreditsForImplicitlyTerminated = false
-
-  def code = UserStateChangeReasonCodes.InitialSetupCode
+  /**
+   * When the user state is initially set up.
+   */
+  def apply() = {
+    UserStateChangeReason(
+      None,
+      None,
+      Map(
+        UserStateChangeReason.Names.`type` -> `type`,
+        UserStateChangeReason.Names.shouldStoreUserState -> true
+      )
+    )
+  }
 }
 
-/**
- * When the user processing unit (actor) is initially set up.
- */
-case object InitialUserActorSetup extends UserStateChangeReason {
-  def shouldStoreUserState = true
-
-  def shouldStoreCalculatedWalletEntries = false
-
-  def forPreviousBillingMonth = this
+object InitialUserActorSetup {
+  def `type` = "InitialUserActorSetup"
 
-  def calculateCreditsForImplicitlyTerminated = false
-
-  def code = UserStateChangeReasonCodes.InitialUserActorSetup
+  /**
+   * When the user processing unit (actor) is initially set up.
+   */
+  def apply() = {
+    UserStateChangeReason(
+      None,
+      None,
+      Map(
+        UserStateChangeReason.Names.`type` -> `type`,
+        UserStateChangeReason.Names.shouldStoreUserState -> true
+      )
+    )
+  }
 }
-/**
- * A calculation made for no specific reason. Can be for testing, for example.
- *
- */
-case object NoSpecificChangeReason extends UserStateChangeReason {
-  def shouldStoreUserState = false
-
-  def shouldStoreCalculatedWalletEntries = false
-
-  def forBillingMonthInfo(bmi: BillingMonthInfo) = this
 
-  def forPreviousBillingMonth = this
+object NoSpecificChangeReason {
+  def `type` = "NoSpecificChangeReason"
 
-  def calculateCreditsForImplicitlyTerminated = false
-
-  def code = UserStateChangeReasonCodes.NoSpecificChangeCode
+  /**
+   * A calculation made for no specific reason. Can be for testing, for example.
+   */
+  def apply() = {
+    UserStateChangeReason(
+      None,
+      None,
+      Map(
+        UserStateChangeReason.Names.`type` -> `type`
+      )
+    )
+  }
 }
 
-/**
- * An authoritative calculation for the billing period.
- *
- * This marks a state for caching.
- *
- * @param billingMonthInfo
- */
-case class MonthlyBillingCalculation(billingMonthInfo: BillingMonthInfo) extends UserStateChangeReason {
-  def shouldStoreUserState = true
-
-  def shouldStoreCalculatedWalletEntries = true
-
-  def forPreviousBillingMonth = MonthlyBillingCalculation(billingMonthInfo.previousMonth)
-
-  def calculateCreditsForImplicitlyTerminated = true
+object MonthlyBillingCalculation {
+  def `type` = "MonthlyBillingCalculation"
 
-  def code = UserStateChangeReasonCodes.MonthlyBillingCode
+  /**
+   * An authoritative calculation for the billing period.
+   */
+  def apply(parentReason: UserStateChangeReason, billingMongthInfo: BillingMonthInfo) = {
+    UserStateChangeReason(
+      Some(parentReason),
+      Some(billingMongthInfo),
+      Map(
+        UserStateChangeReason.Names.`type` -> `type`,
+        UserStateChangeReason.Names.shouldStoreUserState -> true,
+        UserStateChangeReason.Names.shouldStoreCalculatedWalletEntries -> true,
+        UserStateChangeReason.Names.calculateCreditsForImplicitlyTerminated -> true
+      )
+    )
+  }
 }
 
-/**
- * Used for the realtime billing calculation.
- *
- * @param forWhenMillis The time this calculation is for
- */
-case class RealtimeBillingCalculation(forWhenMillis: Long) extends UserStateChangeReason {
-  def shouldStoreUserState = false
-
-  def shouldStoreCalculatedWalletEntries = false
-
-  def forPreviousBillingMonth = this
+object RealtimeBillingCalculation {
+  def `type` = "RealtimeBillingCalculation"
 
-  def calculateCreditsForImplicitlyTerminated = false
-
-  def code = UserStateChangeReasonCodes.RealtimeBillingCode
+  /**
+   * Used for the real-time billing calculation.
+   */
+  def apply(parentReason: Option[UserStateChangeReason], forWhenMillis: Long) = {
+    UserStateChangeReason(
+      parentReason,
+      None,
+      Map(
+        UserStateChangeReason.Names.`type` -> `type`,
+        UserStateChangeReason.Names.forWhenMillis -> forWhenMillis
+      )
+    )
+  }
 }
 
-case class IMEventArrival(imEvent: IMEventModel) extends UserStateChangeReason {
-  def shouldStoreUserState = true
-
-  def shouldStoreCalculatedWalletEntries = false
-
-  def forPreviousBillingMonth = this
-
-  def calculateCreditsForImplicitlyTerminated = false
-
-  def code = UserStateChangeReasonCodes.IMEventArrivalCode
+object IMEventArrival {
+  def `type` = "IMEventArrival"
+
+  def apply(imEvent: IMEventModel) = {
+    UserStateChangeReason(
+      None,
+      None,
+      Map(
+        UserStateChangeReason.Names.`type` -> `type`,
+        UserStateChangeReason.Names.imEvent -> imEvent,
+        UserStateChangeReason.Names.shouldStoreUserState -> true
+      )
+    )
+  }
 }
index 4ad5a10..173dca2 100644 (file)
@@ -97,8 +97,9 @@ class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger)
         // Let's decide if it is OK to store the event
         // Remember that OK == None as the returning result
         //
-        // NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so
-        //       do not bother to catch exceptions here.
+        // NOTE: If anything goes wrong with this function, then the handler
+        //       (handlePayload in GenericPayloadHandler) will issue a Resend,
+        //       so do not bother to catch exceptions here.
 
         // 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
         //    It is a requirement that this ID is unique.
@@ -117,22 +118,18 @@ class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger)
             //    Sorry. We cannot tolerate out-of-order events here, since they really mess with the
             //    agreements selection and thus with the charging procedure.
             //
-            // 2.1 The only exception is the very first activation ever. We allow late arrival, since
+            // 2.1 The only exception is the user creation event. We allow late arrival, since
             //     the rest of Aquarium does nothing (but accumulate events) if the user has never
-            //     been activated.
+            //     been properly created (this behavior may be helpful to devops).
             //
             // TODO: We really need to store these bad events anyway but somewhere else (BadEventsStore?)
-            val userID = imEvent.userID
-
-            store.findLatestIMEventByUserID(userID) match {
-              case Some(latestStoredEvent) ⇒
+            def checkOlder(): Option[HandlerResult] = {
+              store.findLatestIMEventByUserID(imEvent.userID) match {
+                case Some(latestStoredEvent) ⇒
+                  val occurredMillis       = imEvent.occurredMillis
+                  val latestOccurredMillis = latestStoredEvent.occurredMillis
 
-               val occurredMillis       = imEvent.occurredMillis
-               val latestOccurredMillis = latestStoredEvent.occurredMillis
-
-               if(occurredMillis < latestOccurredMillis) {
-                  // OK this is older than our most recent event. Essentially a glimpse in the past.
-                  def rejectMessage = {
+                  if(occurredMillis < latestOccurredMillis) {
                     val occurredDebugString       = new MutableDateCalc(occurredMillis).toYYYYMMDDHHMMSSSSS
                     val latestOccurredDebugString = new MutableDateCalc(latestOccurredMillis).toYYYYMMDDHHMMSSSSS
 
@@ -142,35 +139,43 @@ class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger)
                       latestOccurredDebugString
                     )
 
-                    logger.debug(formatter("Rejecting newer %s. [%s] < [%s]"))
+                    logger.debug(formatter("Rejecting older %s. [%s] < [%s]"))
 
-                    Some(HandlerResultReject(formatter("Newer %s. [%s] < [%s]")))
+                    Some(HandlerResultReject(formatter("Older %s. [%s] < [%s]")))
+                  } else {
+                    None
                   }
 
-                  // Has the user been activated before?
-                  store.findFirstIsActiveIMEventByUserID(userID) match {
-                    case Some(_) ⇒
-                      // Yes, so the new event must be rejected
-                      rejectMessage
-
-                    case None ⇒
-                      // No. Process the new event only if it is an activation.
-                      if(imEvent.isActive) {
-                       logger.info("First activation %s".format(imEventDebugString))
-                       acceptMessage
-                      } else {
-                       rejectMessage
-                      }
-                  }
-               } else {
-                 // We accept all newer events
-                 acceptMessage
-               }
-
-              case None ⇒
-                // This is the very first event ever
-                logger.info("First ever %s".format(imEventDebugString))
-                acceptMessage
+                case None ⇒
+                  None
+              }
+            }
+            val userID = imEvent.userID
+
+            val userHasBeenCreated = store.findCreateIMEventByUserID(userID).isDefined
+            val isCreateUser       = imEvent.isCreateUser
+
+            (userHasBeenCreated, isCreateUser) match {
+              case (true, true) ⇒
+                // (User CREATEd, CREATE event)
+                val reason = "User is already created. Rejecting %s".format(imEventDebugString)
+                logger.info(reason)
+                Some(HandlerResultReject(reason))
+
+              case (true, false) ⇒
+                // (User CREATEd, MODIFY event)
+                checkOlder()
+
+              case (false, true) ⇒
+                // (User not CREATEd, CREATE event)
+                logger.info("User created by %s".format(imEventDebugString))
+                None
+
+              case (false, false) ⇒
+                // (User not CREATEd, MODIFY event)
+                // We allow any older modification events until the user is created
+                logger.debug("User not created yet. Processing %s".format(imEventDebugString))
+                None
             }
         }
       },
index 90adb24..421daf9 100644 (file)
@@ -98,8 +98,9 @@ class ResourceEventPayloadHandler(aquarium: Aquarium, logger: Logger)
         // Let's decide if it is OK to store the event
         // Remember that OK == None as the returning result
         //
-        // NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so
-        //       do not bother to catch exceptions here.
+        // NOTE: If anything goes wrong with this function, then the handler
+        //       (handlePayload in GenericPayloadHandler) will issue a Resend,
+        //       so do not bother to catch exceptions here.
 
         // 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
         //    It is a requirement that this ID is unique.
index cabfd66..bb02803 100644 (file)
@@ -52,23 +52,10 @@ import gr.grnet.aquarium.computation.reason.{IMEventArrival, RealtimeBillingCalc
  */
 
 object JsonConversions {
-  final val HintedFormats = new DefaultFormats {
-
-    override val typeHints = ShortTypeHints(
-      List(
-        InitialUserStateSetup.getClass,
-        InitialUserActorSetup.getClass,
-        NoSpecificChangeReason.getClass,
-        classOf[MonthlyBillingCalculation],
-        classOf[RealtimeBillingCalculation],
-        classOf[IMEventArrival]))
-
-    override val typeHintFieldName = "type"
-  }
   /**
    * The application-wide JSON formats used from the underlying lift-json library.
    */
-  implicit final val Formats = (HintedFormats ++ JodaTimeSerializers.all)
+  implicit final val Formats = (DefaultFormats ++ JodaTimeSerializers.all)
 
   /**
    * Converts a value to JSON AST (Abstract Syntax Tree) by acting a bit intelligently, depending on the actual type
index 870efa0..7139cd5 100644 (file)
@@ -64,6 +64,8 @@ trait IMEventModel extends ExternalEventModel {
 
   def isModifyUser = eventType.equalsIgnoreCase(IMEventModel.EventTypeNames.modify)
 
+  def userCreationMillisOption = if(isCreateUser) Some(this.occurredMillis) else None
+
   override def toDebugString = {
     "%s(userID=%s, id=%s, isActive=%s, role='%s', occurred=%s)".format(
       shortClassNameOf(this),
index 62e5e62..231049e 100644 (file)
@@ -68,6 +68,11 @@ trait IMEventStore {
    */
   def findIMEventById(id: String): Option[IMEvent]
 
+  /**
+   * Find the `CREATE` even for the given user. Note that there must be only one such event.
+   */
+  def findCreateIMEventByUserID(userID: String): Option[IMEvent]
+
   def findLatestIMEventByUserID(userID: String): Option[IMEvent]
 
   /**
index a269793..26e4bb4 100644 (file)
@@ -277,17 +277,23 @@ class MemStore extends UserStateStore
 
   def findIMEventById(id: String) = imEventById.get(id)
 
+
+  /**
+   * Find the `CREATE` even for the given user. Note that there must be only one such event.
+   */
+  def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
+    imEventById.valuesIterator.filter { e ⇒
+      e.userID == userID && e.isCreateUser
+    }.toList.sortWith { case (e1, e2) ⇒
+      e1.occurredMillis < e2.occurredMillis
+    } headOption
+  }
+
   def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
     imEventById.valuesIterator.filter(_.userID == userID).toList.sortWith {
       case (us1, us2) ⇒
         us1.occurredMillis > us2.occurredMillis
-    } match {
-      case head :: _ ⇒
-        Some(head)
-
-      case _ ⇒
-        None
-    }
+    } headOption
   }
 
   def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent] = {
index 100f68b..0b8d209 100644 (file)
@@ -100,7 +100,7 @@ class MongoDBStore(
     MongoDBResourceEvent.fromOther(event, null)
   }
 
-  def pingResourceEventStore(): Unit = {
+  def pingResourceEventStore(): Unit = synchronized {
     MongoDBStore.ping(mongo)
   }
 
@@ -259,6 +259,27 @@ class MongoDBStore(
     MongoDBStore.findBy(IMEventNames.id, id, imEvents, MongoDBIMEvent.fromDBObject)
   }
 
+
+  /**
+   * Find the `CREATE` even for the given user. Note that there must be only one such event.
+   */
+  def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
+    val query = new BasicDBObjectBuilder().
+      add(IMEventNames.userID, userID).
+      add(IMEventNames.eventType, IMEventModel.EventTypeNames.create).get()
+
+    // Normally one such event is allowed ...
+    val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
+
+    withCloseable(cursor) { cursor ⇒
+      if(cursor.hasNext) {
+        Some(MongoDBIMEvent.fromDBObject(cursor.next()))
+      } else {
+        None
+      }
+    }
+  }
+
   def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
     val query = new BasicDBObject(IMEventNames.userID, userID)
     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, -1))
@@ -391,7 +412,7 @@ object MongoDBStore {
     PolicyEntry.fromJson(JSON.serialize(dbObj))
   }
 
-  def ping(mongo: Mongo): Unit = {
+  def ping(mongo: Mongo): Unit = synchronized {
     // This requires a network roundtrip
     mongo.isLocked
   }
index 4ab4e58..c692b49 100644 (file)
@@ -47,8 +47,8 @@ import org.junit.{Assert, Ignore, Test}
 import gr.grnet.aquarium.logic.accounting.algorithm.{ExecutableCostPolicyAlgorithm, CostPolicyAlgorithmCompiler}
 import gr.grnet.aquarium.{AquariumException}
 import gr.grnet.aquarium.Aquarium.{Instance ⇒ AquariumInstance}
-import gr.grnet.aquarium.computation.reason.MonthlyBillingCalculation
 import gr.grnet.aquarium.computation.{UserStateBootstrappingData, UserState, BillingMonthInfo, UserStateComputations}
+import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, MonthlyBillingCalculation}
 
 
 /**
@@ -305,7 +305,7 @@ aquariumpolicy:
       UserStateBootstrap,
       billingMonthInfo,
       DefaultResourcesMap,
-      MonthlyBillingCalculation(billingMonthInfo),
+      MonthlyBillingCalculation(NoSpecificChangeReason(), billingMonthInfo),
       Some(clog)
     )
   }