import gr.grnet.aquarium.event.model.im.IMEventModel
import gr.grnet.aquarium.{AquariumException, Aquarium}
import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
-import gr.grnet.aquarium.computation.reason.{InitialUserActorSetup, UserStateChangeReason, IMEventArrival, InitialUserStateSetup}
import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf, shortNameOfClass, shortNameOfType}
+import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, InitialUserActorSetup, UserStateChangeReason, IMEventArrival, InitialUserStateSetup}
/**
*
def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
}
- private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel): (Boolean, Boolean, String) = {
+ private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel, roleCheck: Option[String]) = {
if(haveIMState) {
- val currentRole = this._imState.roleHistory.lastRole.map(_.name).getOrElse(null)
-// logger.debug("Current role = %s".format(currentRole))
+ val (newState,
+ creationTimeChanged,
+ activationTimeChanged,
+ roleChanged) = this._imState.updatedWithEvent(imEvent, roleCheck)
- if(imEvent.role != currentRole) {
-// logger.debug("New role = %s".format(imEvent.role))
- this._imState = this._imState.updateRoleHistoryWithEvent(imEvent)
- (true, false, "")
- } else {
- val noUpdateReason = "Same role '%s'".format(currentRole)
-// logger.debug(noUpdateReason)
- (false, false, noUpdateReason)
- }
+ this._imState = newState
+ (creationTimeChanged, activationTimeChanged, roleChanged)
} else {
this._imState = IMStateSnapshot.initial(imEvent)
- (true, true, "")
+ (
+ imEvent.isCreateUser,
+ true, // first activation status is a change by default??
+ true // first role is a change by default??
+ )
}
}
/**
* Creates the IMStateSnapshot and returns the number of updates it made to it.
*/
- private[this] def createInitialIMState(event: InitializeUserState): Int = {
- val userID = event.userID
+ private[this] def createInitialIMState(): Unit = {
val store = aquarium.imEventStore
- var _updateCount = 0
+ var _roleCheck = None: Option[String]
- store.replayIMEventsInOccurrenceOrder(userID) { imEvent ⇒
+ // this._userID is already set up
+ store.replayIMEventsInOccurrenceOrder(this._userID) { imEvent ⇒
DEBUG("Replaying %s", imEvent)
- val (updated, firstUpdate, noUpdateReason) = _updateIMStateRoleHistory(imEvent)
- if(updated) {
- _updateCount = _updateCount + 1
- DEBUG("Updated %s for role '%s'", shortNameOfType[IMStateSnapshot], imEvent.role)
- } else {
- DEBUG("Not updated %s due to: %s", shortNameOfType[IMStateSnapshot], noUpdateReason)
- }
- }
+ val (creationTimeChanged, activationTimeChanged, roleChanged) = _updateIMStateRoleHistory(imEvent, _roleCheck)
+ _roleCheck = this._imState.roleHistory.lastRoleName
- if(_updateCount > 0)
- DEBUG("Computed %s = %s", shortNameOfType[IMStateSnapshot], this._imState)
- else
- DEBUG("Not computed %s", shortNameOfType[IMStateSnapshot])
+ DEBUG(
+ "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
+ creationTimeChanged, activationTimeChanged, roleChanged,
+ imEvent
+ )
+ }
- _updateCount
+ DEBUG("New %s = %s", shortNameOfType[IMStateSnapshot], this._imState)
}
/**
* Resource events are processed only if the user has been activated.
*/
private[this] def shouldProcessResourceEvents: Boolean = {
- haveIMState && this._imState.hasBeenActivated
+ haveIMState && this._imState.hasBeenCreated
}
private[this] def loadUserStateAndUpdateRoleHistory(): Unit = {
- val userActivationMillis = this._imState.userActivationMillis.get
+ val userCreationMillis = this._imState.userCreationMillis.get
val initialRole = this._imState.roleHistory.firstRole.get.name
val userStateBootstrap = UserStateBootstrappingData(
this._userID,
- userActivationMillis,
+ userCreationMillis,
initialRole,
- aquarium.initialAgreementForRole(initialRole, userActivationMillis),
- aquarium.initialBalanceForRole(initialRole, userActivationMillis)
+ aquarium.initialAgreementForRole(initialRole, userCreationMillis),
+ aquarium.initialBalanceForRole(initialRole, userCreationMillis)
)
val userState = userStateComputations.doFullMonthlyBilling(
userStateBootstrap,
BillingMonthInfo.fromMillis(TimeHelpers.nowMillis()),
aquarium.currentResourcesMap,
- InitialUserStateSetup,
+ InitialUserStateSetup(),
None
)
// Final touch: Update role history
if(haveIMState && haveUserState) {
if(this._userState.roleHistory != this._imState.roleHistory) {
- this._userState = newUserStateWithUpdatedRoleHistory(InitialUserActorSetup)
+ this._userState = newUserStateWithUpdatedRoleHistory(InitialUserActorSetup())
}
}
}
return
}
- if(!this._imState.hasBeenActivated) {
+ if(!this._imState.hasBeenCreated) {
// Cannot set the initial state!
- DEBUG("Cannot create %s from %s, since user is inactive", shortNameOfType[UserState], event)
+ DEBUG("Cannot create %s from %s, since user has not been created", shortNameOfType[UserState], event)
return
}
}
def onInitializeUserState(event: InitializeUserState): Unit = {
- val userID = event.userID
- this._userID = userID
+ this._userID = event.userID
DEBUG("Got %s", event)
- createInitialIMState(event)
+ createInitialIMState()
createInitialUserState(event)
}
return
}
- val (updated, firstUpdate, noUpdateReason) = _updateIMStateRoleHistory(imEvent)
+ val (creationTimeChanged,
+ activationTimeChanged,
+ roleChanged) = _updateIMStateRoleHistory(imEvent, this._imState.roleHistory.lastRoleName)
- if(updated) {
- DEBUG("Updated %s = %s", shortClassNameOf(this._imState), this._imState)
+ DEBUG(
+ "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
+ creationTimeChanged, activationTimeChanged, roleChanged,
+ imEvent
+ )
- // Must also update user state
- if(shouldProcessResourceEvents) {
- if(!haveUserState) {
- loadUserStateAndUpdateRoleHistory()
- INFO("Loaded %s due to %s", shortNameOfType[UserState], imEvent)
- } else {
- // Just update role history
- this._userState = newUserStateWithUpdatedRoleHistory(IMEventArrival(imEvent))
- INFO("Updated %s due to %s", shortNameOfType[UserState], imEvent)
- }
+ // Must also update user state if we know when in history the life of a user begins
+ if(creationTimeChanged) {
+ if(!haveUserState) {
+ loadUserStateAndUpdateRoleHistory()
+ INFO("Loaded %s due to %s", shortNameOfType[UserState], imEvent)
+ } else {
+ // Just update role history
+ this._userState = newUserStateWithUpdatedRoleHistory(IMEventArrival(imEvent))
+ INFO("Updated %s due to %s", shortNameOfType[UserState], imEvent)
}
- } else {
- DEBUG("Not updating %s from %s due to: %s", shortNameOfType[IMStateSnapshot], imEvent, noUpdateReason)
}
+ DEBUG("New %s = %s", shortNameOfType[IMStateSnapshot], this._imState)
+
logSeparator()
}
occurredMillis: Long, // The time fro which this state is relevant
// The last known change reason for this userState
- lastChangeReason: UserStateChangeReason = NoSpecificChangeReason,
+ lastChangeReason: UserStateChangeReason = NoSpecificChangeReason(),
// The user state we used to compute this one. Normally the (cached)
// state at the beginning of the billing period.
parentUserStateId: Option[String] = None,
OwnedResourcesSnapshot.Empty,
Nil,
userCreationMillis,
- InitialUserStateSetup
+ InitialUserStateSetup()
)
}
package gr.grnet.aquarium.computation
/**
- * Used to bootstrao the user state.
+ * This is used to bootstrap the user state.
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
val clog = ContextualLogger.fromOther(
clogOpt,
logger,
- "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
+ "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
clog.begin()
def doCompute: UserState = {
val userID = userStateBootstrap.userID
val userCreationMillis = userStateBootstrap.userCreationMillis
val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
- val billingMonthStartMillis = billingMonthInfo.startMillis
- val billingMonthStopMillis = billingMonthInfo.stopMillis
+ val billingMonthStartMillis = billingMonthInfo.monthStartMillis
+ val billingMonthStopMillis = billingMonthInfo.monthStopMillis
if(billingMonthStopMillis < userCreationMillis) {
// If the user did not exist for this billing month, piece of cake
def doFullMonthlyBilling(userStateBootstrap: UserStateBootstrappingData,
billingMonthInfo: BillingMonthInfo,
defaultResourcesMap: DSLResourcesMap,
- calculationReason: UserStateChangeReason = NoSpecificChangeReason,
+ calculationReason: UserStateChangeReason,
clogOpt: Option[ContextualLogger] = None): UserState = {
val userID = userStateBootstrap.userID
val clog = ContextualLogger.fromOther(
clogOpt,
logger,
- "doFullMonthlyBilling(%s)", billingMonthInfo)
+ "doFullMonthlyBilling(%s)", billingMonthInfo.toShortDebugString)
clog.begin()
val clogSome = Some(clog)
userStateBootstrap,
billingMonthInfo.previousMonth,
defaultResourcesMap,
- calculationReason.forPreviousBillingMonth,
+ calculationReason.forBillingMonthInfo(billingMonthInfo.previousMonth),
clogSome
)
val startingUserState = previousBillingMonthUserState
- val billingMonthStartMillis = billingMonthInfo.startMillis
- val billingMonthEndMillis = billingMonthInfo.stopMillis
+ val billingMonthStartMillis = billingMonthInfo.monthStartMillis
+ val billingMonthEndMillis = billingMonthInfo.monthStopMillis
// Keep the working (current) user state. This will get updated as we proceed with billing for the month
// specified in the parameters.
/**
* The earliest activation time, if it exists.
*/
- userActivationMillis: Option[Long],
+ userEarliestActivationMillis: Option[Long],
+
+ /**
+ * The user creation time, if it exists
+ */
+ userCreationMillis: Option[Long],
/**
* This is the recorded role history
* True iff the user has ever been activated even once.
*/
def hasBeenActivated: Boolean = {
- userActivationMillis.isDefined
+ userEarliestActivationMillis.isDefined
+ }
+
+ def hasBeenCreated: Boolean = {
+ userCreationMillis.isDefined
+ }
+
+ /**
+ * Given the newly arrived event, we compute the updated user earliest activation time, if any.
+ * We always update activation time if it is earlier than the currently known activation time.
+ */
+ private[this] def updatedEarliestActivationTime(imEvent: IMEventModel): Option[Long] = {
+ this.userEarliestActivationMillis match {
+ case Some(activationMillis) if imEvent.isStateActive && activationMillis < imEvent.occurredMillis ⇒
+ Some(imEvent.occurredMillis)
+
+ case None if imEvent.isStateActive ⇒
+ Some(imEvent.occurredMillis)
+
+ case other ⇒
+ other
+ }
}
- def updateRoleHistoryWithEvent(imEvent: IMEventModel) = {
- copy(
- userActivationMillis = if(imEvent.isStateActive) Some(imEvent.occurredMillis) else this.userActivationMillis,
- latestIMEvent = imEvent,
- roleHistory = this.roleHistory.updateWithRole(imEvent.role, imEvent.occurredMillis)
+ /**
+ * Given the newly arrived event, we compute the updated user creation time, if any.
+ * Only the first `create` event triggers an actual update.
+ */
+ private[this] def updatedCreationTime(imEvent: IMEventModel): Option[Long] = {
+ // Allow only the first `create` event
+ if(this.userCreationMillis.isDefined) {
+ this.userCreationMillis
+ } else if(imEvent.isCreateUser) {
+ Some(imEvent.occurredMillis)
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Given the newly arrived event, we compute the updated role history.
+ */
+ private[this] def updatedRoleHistory(imEvent: IMEventModel): RoleHistory = {
+ this.roleHistory.updatedWithRole(imEvent.role, imEvent.occurredMillis)
+ }
+
+ /**
+ * Computes an updated state and returns a tuple made of four elements:
+ * a) the updated state, b) a `Boolean` indicating whether the user creation
+ * time has changed, c) a `Boolean` indicating whether the user activation
+ * time has changed and d) a `Boolean` indicating whether the user
+ * role history has changed.
+ *
+ * The role history is updated only if the `roleCheck` is not `None` and
+ * the role it represents is different than the role of the `imEvent`.
+ * The motivation for `roleCheck` is to use this method in a loop (as in replaying
+ * events from the [[gr.grnet.aquarium.store.IMEventStore]]).
+ */
+ def updatedWithEvent(imEvent: IMEventModel,
+ roleCheck: Option[String]): (IMStateSnapshot, Boolean, Boolean, Boolean) = {
+ // Things of interest that may change by the imEvent:
+ // - user creation time
+ // - user activation time
+ // - user role
+
+ val newCreationTime = updatedCreationTime(imEvent)
+ val creationTimeChanged = this.userCreationMillis != newCreationTime
+
+ val newActivationTime = updatedEarliestActivationTime(imEvent)
+ val activationTimeChanged = this.userEarliestActivationMillis != newActivationTime
+
+ val (roleChanged, newRoleHistory) = roleCheck match {
+ case Some(role) if role != imEvent.role ⇒
+ (true, updatedRoleHistory(imEvent))
+
+ case _ ⇒
+ (false, this.roleHistory)
+ }
+
+ val newState = this.copy(
+ latestIMEvent = imEvent,
+ userCreationMillis = newCreationTime,
+ userEarliestActivationMillis = newActivationTime,
+ roleHistory = newRoleHistory
)
+
+ (newState, creationTimeChanged, activationTimeChanged, roleChanged)
}
override def toString = {
- "%s(\n!! %s\n!! %s\n!! %s)".format(
+ "%s(\n!! %s\n!! %s\n!! %s\n!! %s)".format(
shortClassNameOf(this),
latestIMEvent,
- userActivationMillis.map(new MutableDateCalc(_)),
+ userCreationMillis.map(new MutableDateCalc(_)),
+ userEarliestActivationMillis.map(new MutableDateCalc(_)),
roleHistory
)
}
IMStateSnapshot(
imEvent,
if(imEvent.isStateActive) Some(imEvent.occurredMillis) else None,
+ if(imEvent.isCreateUser) Some(imEvent.occurredMillis) else None,
RoleHistory.initial(imEvent.role, imEvent.occurredMillis))
}
}
TreeMap(roles.map(role ⇒ (role.timeslot, role)): _*)
}
- def updateWithRole(role: String, validFrom: Long) = {
+ def updatedWithRole(role: String, validFrom: Long) = {
// TODO: Review this when Timeslot is also reviewed.
// Currently, we need `fixValidTo` because Timeslot does not validate when `validFrom` and `validTo`
// are equal.
import gr.grnet.aquarium.computation.BillingMonthInfo
import gr.grnet.aquarium.event.model.im.IMEventModel
+import gr.grnet.aquarium.util.shortClassNameOf
-sealed trait UserStateChangeReason {
+/**
+ * Provides information explaining the reason Aquarium calculated a new [[gr.grnet.aquarium.computation.UserState]].
+ */
+case class UserStateChangeReason(
+ parentReason: Option[UserStateChangeReason],
+ billingMonthInfo: Option[BillingMonthInfo],
+ details: Map[String, Any]
+) {
+
+ require(
+ details.contains(UserStateChangeReason.Names.`type`),
+ "No type present in the details of %s".format(shortClassNameOf(this))
+ )
+
+ private[this] def booleanFromDetails(name: String, default: Boolean) = {
+ details.get(name) match {
+ case Some(value: Boolean) ⇒
+ value
+
+ case _ ⇒
+ false
+ }
+ }
+
+ /**
+ * Return `true` if the result of the calculation should be stored back to the
+ * [[gr.grnet.aquarium.store.UserStateStore]].
+ *
+ */
+ def shouldStoreUserState: Boolean =
+ booleanFromDetails(UserStateChangeReason.Names.shouldStoreUserState, false)
+
+ def shouldStoreCalculatedWalletEntries: Boolean =
+ booleanFromDetails(UserStateChangeReason.Names.shouldStoreCalculatedWalletEntries, false)
+
+ def calculateCreditsForImplicitlyTerminated: Boolean =
+ booleanFromDetails(UserStateChangeReason.Names.calculateCreditsForImplicitlyTerminated, false)
+
+ def forBillingMonthInfo(bmi: BillingMonthInfo) = {
+ copy(
+ parentReason = Some(this),
+ billingMonthInfo = Some(bmi)
+ )
+ }
+
+ def `type`: String = {
+ // This must be always present
+ details(UserStateChangeReason.Names.`type`).asInstanceOf[String]
+ }
+}
+
+object UserStateChangeReason {
+ object Names {
+ final val `type` = "type"
+
+ final val imEvent = "imEvent"
+ final val forWhenMillis = "forWhenMillis"
+
+ final val shouldStoreUserState = "shouldStoreUserState"
+ final val shouldStoreCalculatedWalletEntries = "shouldStoreCalculatedWalletEntries"
+ final val calculateCreditsForImplicitlyTerminated = "calculateCreditsForImplicitlyTerminated"
+ }
+}
+
+sealed trait UserStateChangeReason_ {
+ def originalReason: UserStateChangeReason_
/**
* Return `true` if the result of the calculation should be stored back to the
* [[gr.grnet.aquarium.store.UserStateStore]].
def shouldStoreCalculatedWalletEntries: Boolean
- def forPreviousBillingMonth: UserStateChangeReason
+ def forPreviousBillingMonth: UserStateChangeReason_
def calculateCreditsForImplicitlyTerminated: Boolean
def code: UserStateChangeReasonCodes.ChangeReasonCode
}
-/**
- * When the user state is initially set up.
- */
-case object InitialUserStateSetup extends UserStateChangeReason {
- def shouldStoreUserState = true
-
- def shouldStoreCalculatedWalletEntries = false
-
- def forPreviousBillingMonth = this
+object InitialUserStateSetup {
+ def `type` = "InitialUserStateSetup"
- def calculateCreditsForImplicitlyTerminated = false
-
- def code = UserStateChangeReasonCodes.InitialSetupCode
+ /**
+ * When the user state is initially set up.
+ */
+ def apply() = {
+ UserStateChangeReason(
+ None,
+ None,
+ Map(
+ UserStateChangeReason.Names.`type` -> `type`,
+ UserStateChangeReason.Names.shouldStoreUserState -> true
+ )
+ )
+ }
}
-/**
- * When the user processing unit (actor) is initially set up.
- */
-case object InitialUserActorSetup extends UserStateChangeReason {
- def shouldStoreUserState = true
-
- def shouldStoreCalculatedWalletEntries = false
-
- def forPreviousBillingMonth = this
+object InitialUserActorSetup {
+ def `type` = "InitialUserActorSetup"
- def calculateCreditsForImplicitlyTerminated = false
-
- def code = UserStateChangeReasonCodes.InitialUserActorSetup
+ /**
+ * When the user processing unit (actor) is initially set up.
+ */
+ def apply() = {
+ UserStateChangeReason(
+ None,
+ None,
+ Map(
+ UserStateChangeReason.Names.`type` -> `type`,
+ UserStateChangeReason.Names.shouldStoreUserState -> true
+ )
+ )
+ }
}
-/**
- * A calculation made for no specific reason. Can be for testing, for example.
- *
- */
-case object NoSpecificChangeReason extends UserStateChangeReason {
- def shouldStoreUserState = false
-
- def shouldStoreCalculatedWalletEntries = false
-
- def forBillingMonthInfo(bmi: BillingMonthInfo) = this
- def forPreviousBillingMonth = this
+object NoSpecificChangeReason {
+ def `type` = "NoSpecificChangeReason"
- def calculateCreditsForImplicitlyTerminated = false
-
- def code = UserStateChangeReasonCodes.NoSpecificChangeCode
+ /**
+ * A calculation made for no specific reason. Can be for testing, for example.
+ */
+ def apply() = {
+ UserStateChangeReason(
+ None,
+ None,
+ Map(
+ UserStateChangeReason.Names.`type` -> `type`
+ )
+ )
+ }
}
-/**
- * An authoritative calculation for the billing period.
- *
- * This marks a state for caching.
- *
- * @param billingMonthInfo
- */
-case class MonthlyBillingCalculation(billingMonthInfo: BillingMonthInfo) extends UserStateChangeReason {
- def shouldStoreUserState = true
-
- def shouldStoreCalculatedWalletEntries = true
-
- def forPreviousBillingMonth = MonthlyBillingCalculation(billingMonthInfo.previousMonth)
-
- def calculateCreditsForImplicitlyTerminated = true
+object MonthlyBillingCalculation {
+ def `type` = "MonthlyBillingCalculation"
- def code = UserStateChangeReasonCodes.MonthlyBillingCode
+ /**
+ * An authoritative calculation for the billing period.
+ */
+ def apply(parentReason: UserStateChangeReason, billingMongthInfo: BillingMonthInfo) = {
+ UserStateChangeReason(
+ Some(parentReason),
+ Some(billingMongthInfo),
+ Map(
+ UserStateChangeReason.Names.`type` -> `type`,
+ UserStateChangeReason.Names.shouldStoreUserState -> true,
+ UserStateChangeReason.Names.shouldStoreCalculatedWalletEntries -> true,
+ UserStateChangeReason.Names.calculateCreditsForImplicitlyTerminated -> true
+ )
+ )
+ }
}
-/**
- * Used for the realtime billing calculation.
- *
- * @param forWhenMillis The time this calculation is for
- */
-case class RealtimeBillingCalculation(forWhenMillis: Long) extends UserStateChangeReason {
- def shouldStoreUserState = false
-
- def shouldStoreCalculatedWalletEntries = false
-
- def forPreviousBillingMonth = this
+object RealtimeBillingCalculation {
+ def `type` = "RealtimeBillingCalculation"
- def calculateCreditsForImplicitlyTerminated = false
-
- def code = UserStateChangeReasonCodes.RealtimeBillingCode
+ /**
+ * Used for the real-time billing calculation.
+ */
+ def apply(parentReason: Option[UserStateChangeReason], forWhenMillis: Long) = {
+ UserStateChangeReason(
+ parentReason,
+ None,
+ Map(
+ UserStateChangeReason.Names.`type` -> `type`,
+ UserStateChangeReason.Names.forWhenMillis -> forWhenMillis
+ )
+ )
+ }
}
-case class IMEventArrival(imEvent: IMEventModel) extends UserStateChangeReason {
- def shouldStoreUserState = true
-
- def shouldStoreCalculatedWalletEntries = false
-
- def forPreviousBillingMonth = this
-
- def calculateCreditsForImplicitlyTerminated = false
-
- def code = UserStateChangeReasonCodes.IMEventArrivalCode
+object IMEventArrival {
+ def `type` = "IMEventArrival"
+
+ def apply(imEvent: IMEventModel) = {
+ UserStateChangeReason(
+ None,
+ None,
+ Map(
+ UserStateChangeReason.Names.`type` -> `type`,
+ UserStateChangeReason.Names.imEvent -> imEvent,
+ UserStateChangeReason.Names.shouldStoreUserState -> true
+ )
+ )
+ }
}
// Let's decide if it is OK to store the event
// Remember that OK == None as the returning result
//
- // NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so
- // do not bother to catch exceptions here.
+ // NOTE: If anything goes wrong with this function, then the handler
+ // (handlePayload in GenericPayloadHandler) will issue a Resend,
+ // so do not bother to catch exceptions here.
// 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
// It is a requirement that this ID is unique.
// Sorry. We cannot tolerate out-of-order events here, since they really mess with the
// agreements selection and thus with the charging procedure.
//
- // 2.1 The only exception is the very first activation ever. We allow late arrival, since
+ // 2.1 The only exception is the user creation event. We allow late arrival, since
// the rest of Aquarium does nothing (but accumulate events) if the user has never
- // been activated.
+ // been properly created (this behavior may be helpful to devops).
//
// TODO: We really need to store these bad events anyway but somewhere else (BadEventsStore?)
- val userID = imEvent.userID
-
- store.findLatestIMEventByUserID(userID) match {
- case Some(latestStoredEvent) ⇒
+ def checkOlder(): Option[HandlerResult] = {
+ store.findLatestIMEventByUserID(imEvent.userID) match {
+ case Some(latestStoredEvent) ⇒
+ val occurredMillis = imEvent.occurredMillis
+ val latestOccurredMillis = latestStoredEvent.occurredMillis
- val occurredMillis = imEvent.occurredMillis
- val latestOccurredMillis = latestStoredEvent.occurredMillis
-
- if(occurredMillis < latestOccurredMillis) {
- // OK this is older than our most recent event. Essentially a glimpse in the past.
- def rejectMessage = {
+ if(occurredMillis < latestOccurredMillis) {
val occurredDebugString = new MutableDateCalc(occurredMillis).toYYYYMMDDHHMMSSSSS
val latestOccurredDebugString = new MutableDateCalc(latestOccurredMillis).toYYYYMMDDHHMMSSSSS
latestOccurredDebugString
)
- logger.debug(formatter("Rejecting newer %s. [%s] < [%s]"))
+ logger.debug(formatter("Rejecting older %s. [%s] < [%s]"))
- Some(HandlerResultReject(formatter("Newer %s. [%s] < [%s]")))
+ Some(HandlerResultReject(formatter("Older %s. [%s] < [%s]")))
+ } else {
+ None
}
- // Has the user been activated before?
- store.findFirstIsActiveIMEventByUserID(userID) match {
- case Some(_) ⇒
- // Yes, so the new event must be rejected
- rejectMessage
-
- case None ⇒
- // No. Process the new event only if it is an activation.
- if(imEvent.isActive) {
- logger.info("First activation %s".format(imEventDebugString))
- acceptMessage
- } else {
- rejectMessage
- }
- }
- } else {
- // We accept all newer events
- acceptMessage
- }
-
- case None ⇒
- // This is the very first event ever
- logger.info("First ever %s".format(imEventDebugString))
- acceptMessage
+ case None ⇒
+ None
+ }
+ }
+ val userID = imEvent.userID
+
+ val userHasBeenCreated = store.findCreateIMEventByUserID(userID).isDefined
+ val isCreateUser = imEvent.isCreateUser
+
+ (userHasBeenCreated, isCreateUser) match {
+ case (true, true) ⇒
+ // (User CREATEd, CREATE event)
+ val reason = "User is already created. Rejecting %s".format(imEventDebugString)
+ logger.info(reason)
+ Some(HandlerResultReject(reason))
+
+ case (true, false) ⇒
+ // (User CREATEd, MODIFY event)
+ checkOlder()
+
+ case (false, true) ⇒
+ // (User not CREATEd, CREATE event)
+ logger.info("User created by %s".format(imEventDebugString))
+ None
+
+ case (false, false) ⇒
+ // (User not CREATEd, MODIFY event)
+ // We allow any older modification events until the user is created
+ logger.debug("User not created yet. Processing %s".format(imEventDebugString))
+ None
}
}
},
// Let's decide if it is OK to store the event
// Remember that OK == None as the returning result
//
- // NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so
- // do not bother to catch exceptions here.
+ // NOTE: If anything goes wrong with this function, then the handler
+ // (handlePayload in GenericPayloadHandler) will issue a Resend,
+ // so do not bother to catch exceptions here.
// 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
// It is a requirement that this ID is unique.
*/
object JsonConversions {
- final val HintedFormats = new DefaultFormats {
-
- override val typeHints = ShortTypeHints(
- List(
- InitialUserStateSetup.getClass,
- InitialUserActorSetup.getClass,
- NoSpecificChangeReason.getClass,
- classOf[MonthlyBillingCalculation],
- classOf[RealtimeBillingCalculation],
- classOf[IMEventArrival]))
-
- override val typeHintFieldName = "type"
- }
/**
* The application-wide JSON formats used from the underlying lift-json library.
*/
- implicit final val Formats = (HintedFormats ++ JodaTimeSerializers.all)
+ implicit final val Formats = (DefaultFormats ++ JodaTimeSerializers.all)
/**
* Converts a value to JSON AST (Abstract Syntax Tree) by acting a bit intelligently, depending on the actual type
def isModifyUser = eventType.equalsIgnoreCase(IMEventModel.EventTypeNames.modify)
+ def userCreationMillisOption = if(isCreateUser) Some(this.occurredMillis) else None
+
override def toDebugString = {
"%s(userID=%s, id=%s, isActive=%s, role='%s', occurred=%s)".format(
shortClassNameOf(this),
*/
def findIMEventById(id: String): Option[IMEvent]
+ /**
+ * Find the `CREATE` even for the given user. Note that there must be only one such event.
+ */
+ def findCreateIMEventByUserID(userID: String): Option[IMEvent]
+
def findLatestIMEventByUserID(userID: String): Option[IMEvent]
/**
def findIMEventById(id: String) = imEventById.get(id)
+
+ /**
+ * Find the `CREATE` even for the given user. Note that there must be only one such event.
+ */
+ def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
+ imEventById.valuesIterator.filter { e ⇒
+ e.userID == userID && e.isCreateUser
+ }.toList.sortWith { case (e1, e2) ⇒
+ e1.occurredMillis < e2.occurredMillis
+ } headOption
+ }
+
def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
imEventById.valuesIterator.filter(_.userID == userID).toList.sortWith {
case (us1, us2) ⇒
us1.occurredMillis > us2.occurredMillis
- } match {
- case head :: _ ⇒
- Some(head)
-
- case _ ⇒
- None
- }
+ } headOption
}
def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent] = {
MongoDBResourceEvent.fromOther(event, null)
}
- def pingResourceEventStore(): Unit = {
+ def pingResourceEventStore(): Unit = synchronized {
MongoDBStore.ping(mongo)
}
MongoDBStore.findBy(IMEventNames.id, id, imEvents, MongoDBIMEvent.fromDBObject)
}
+
+ /**
+ * Find the `CREATE` even for the given user. Note that there must be only one such event.
+ */
+ def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
+ val query = new BasicDBObjectBuilder().
+ add(IMEventNames.userID, userID).
+ add(IMEventNames.eventType, IMEventModel.EventTypeNames.create).get()
+
+ // Normally one such event is allowed ...
+ val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
+
+ withCloseable(cursor) { cursor ⇒
+ if(cursor.hasNext) {
+ Some(MongoDBIMEvent.fromDBObject(cursor.next()))
+ } else {
+ None
+ }
+ }
+ }
+
def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
val query = new BasicDBObject(IMEventNames.userID, userID)
val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, -1))
PolicyEntry.fromJson(JSON.serialize(dbObj))
}
- def ping(mongo: Mongo): Unit = {
+ def ping(mongo: Mongo): Unit = synchronized {
// This requires a network roundtrip
mongo.isLocked
}
import gr.grnet.aquarium.logic.accounting.algorithm.{ExecutableCostPolicyAlgorithm, CostPolicyAlgorithmCompiler}
import gr.grnet.aquarium.{AquariumException}
import gr.grnet.aquarium.Aquarium.{Instance ⇒ AquariumInstance}
-import gr.grnet.aquarium.computation.reason.MonthlyBillingCalculation
import gr.grnet.aquarium.computation.{UserStateBootstrappingData, UserState, BillingMonthInfo, UserStateComputations}
+import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, MonthlyBillingCalculation}
/**
UserStateBootstrap,
billingMonthInfo,
DefaultResourcesMap,
- MonthlyBillingCalculation(billingMonthInfo),
+ MonthlyBillingCalculation(NoSpecificChangeReason(), billingMonthInfo),
Some(clog)
)
}