Keeping IMState locally in the UserActor.
<logger name="cc.spray.can" level="INFO"/>
- <logger name="gr.grnet" level="INFO"/>
+ <logger name="gr.grnet" level="DEBUG"/>
<root level="DEBUG">
<appender-ref ref="FILE"/>
extends ActorRole("UserActorRole",
false,
classOf[UserActor],
- Set(classOf[ProcessSetUserID],
- classOf[ProcessResourceEvent],
+ Set(classOf[ProcessResourceEvent],
classOf[ProcessIMEvent],
classOf[RequestUserBalance],
classOf[UserRequestGetState]),
*/
case class ProcessIMEvent(imEvent: IMEventModel) extends RouterMessage
-case class ProcessSetUserID(userID: String) extends RouterMessage
-
case class AdminRequestPingAll() extends RouterMessage
val userActor = _actorProvider.actorForRole(UserActorRole)
UserActorCache.put(userID, userActor)
UserActorSupervisor.supervisor.link(userActor)
- userActor ! ProcessSetUserID(userID)
userActor
}
import gr.grnet.aquarium.user._
import gr.grnet.aquarium.util.shortClassNameOf
-import gr.grnet.aquarium.util.date.TimeHelpers
import gr.grnet.aquarium.actor.message.service.router._
import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
import gr.grnet.aquarium.event.im.IMEventModel
import akka.config.Supervision.Temporary
import gr.grnet.aquarium.{AquariumInternalError, AquariumException, Configurator}
+import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
/**
*/
class UserActor extends ReflectiveRoleableActor {
- private[this] var _userID: String = _
+ private[this] var _imState: IMStateSnapshot = _
private[this] var _userState: UserState = _
self.lifeCycle = Temporary
+ private[this] def _userID = this._userState.userID
private[this] def _shutmedown(): Unit = {
- if(_haveFullState) {
- UserActorCache.invalidate(this._userID)
+ if(_haveUserState) {
+ UserActorCache.invalidate(_userID)
}
self.stop()
def role = UserActorRole
private[this] def _configurator: Configurator = Configurator.MasterConfigurator
-// private[this] def _userId = _userState.userId
private[this] def _timestampTheshold =
_configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
- private[this] def _haveFullState = {
- (this._userID ne null) && (this._userState ne null)
+ private[this] def _haveUserState = {
+ this._userState ne null
}
- private[this] def _havePartialState = {
- (this._userID ne null) && (this._userState eq null)
+ private[this] def _haveIMState = {
+ this._imState ne null
}
-
def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
}
"default"
}
- private[this] def processCreateUser(imEvent: IMEventModel): Unit = {
- this._userID = imEvent.userID
-
- val store = _configurator.storeProvider.userStateStore
- // try find user state. normally should ot exist
- val latestUserStateOpt = store.findLatestUserStateByUserID(this._userID)
- if(latestUserStateOpt.isDefined) {
- logger.error("Got %s(%s, %s) but user already exists. Ingoring".format(
- this._userID,
- shortClassNameOf(imEvent),
- imEvent.eventType))
-
- return
- }
-
- val initialAgreementName = _getAgreementNameForNewUser(imEvent)
- val newUserState = DefaultUserStateComputations.createInitialUserState(
- this._userID,
- imEvent.occurredMillis,
- imEvent.isActive,
- 0.0,
- List(imEvent.role),
- initialAgreementName)
-
- this._userState = newUserState
-
- // FIXME: If this fails, then the actor must be shut down.
- store.insertUserState(newUserState)
- }
-
- private[this] def processModifyUser(imEvent: IMEventModel): Unit = {
- val now = TimeHelpers.nowMillis()
-
- if(!_haveFullState) {
- ERROR("Got %s(%s) but have no state. Shutting down", shortClassNameOf(imEvent), imEvent.eventType)
- _shutmedown()
- return
- }
-
- this._userState = this._userState.modifyFromIMEvent(imEvent, now)
- }
-
- def onProcessSetUserID(event: ProcessSetUserID): Unit = {
- this._userID = event.userID
- }
-
def onProcessIMEvent(event: ProcessIMEvent): Unit = {
val now = TimeHelpers.nowMillis()
val imEvent = event.imEvent
- // If we already have a userID but it does not match the incoming userID, then this is an internal error
- if(_havePartialState && (this._userID != imEvent.userID)) {
- throw new AquariumInternalError(
- "Got userID = %s but already have userID = %s".format(imEvent.userID, this._userID))
- }
-
- // If we get an IMEvent without having a user state, then we query for the latest user state.
- if(!_haveFullState) {
- val userStateOpt = _configurator.userStateStore.findLatestUserStateByUserID(this._userID)
- this._userState = userStateOpt match {
- case Some(userState) ⇒
- userState
-
- case None ⇒
- val initialAgreementName = _getAgreementNameForNewUser(imEvent)
- val initialUserState = DefaultUserStateComputations.createInitialUserState(
- this._userID,
- imEvent.occurredMillis,
- imEvent.isActive,
- 0.0,
- List(imEvent.role),
- initialAgreementName)
-
- DEBUG("Got initial state")
- initialUserState
- }
- }
+ val isUpdate = if(_haveIMState) {
+ val newOccurredMillis = imEvent.occurredMillis
+ val currentOccurredMillis = this._imState.imEvent.occurredMillis
- if(imEvent.isModifyUser && this._userState.isInitial) {
- INFO("Got a '%s' but have not received '%s' yet", imEvent.eventType, IMEventModel.EventTypeNames.create)
- return
- }
-
- if(imEvent.isCreateUser && !this._userState.isInitial) {
- INFO("Got a '%s' but my state is not initial", imEvent.eventType)
- return
- }
+ if(newOccurredMillis < currentOccurredMillis) {
+ INFO(
+ "Ignoring older IMEvent: [%s] < [%s]",
+ new MutableDateCalc(newOccurredMillis).toYYYYMMDDHHMMSSSSS,
+ new MutableDateCalc(currentOccurredMillis).toYYYYMMDDHHMMSSSSS)
- this._userState = this._userState.modifyFromIMEvent(imEvent, now)
+ return
+ }
- if(imEvent.isCreateUser) {
- processCreateUser(imEvent)
- } else if(imEvent.isModifyUser) {
- processModifyUser(imEvent)
+ true
} else {
- throw new AquariumException("Cannot interpret %s".format(imEvent))
+ false
}
+
+ this._imState = IMStateSnapshot(imEvent, now)
+ DEBUG("%s %s", if(isUpdate) "Update" else "Set", shortClassNameOf(this._imState))
}
def onRequestUserBalance(event: RequestUserBalance): Unit = {
private[this] def D_userID = {
- if(this._userID eq null)
- "<NOT INITIALIZED>" // We always get a userID first
- else
- if(this._userState eq null)
- "%s, NO STATE".format(this._userID)
+ if(this._userState eq null)
+ if(this._imState eq null)
+ "<NOT INITIALIZED>"
else
- "%s".format(this._userID)
+ this._imState.imEvent.userID
+ else
+ this._userState.userID
}
private[this] def DEBUG(fmt: String, args: Any*) =
logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
- logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
+ logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
}
import collection.immutable.{TreeMap, SortedMap}
import util.date.MutableDateCalc
import event.resource.ResourceEventModel
+import gr.grnet.aquarium.event.im.IMEventModel
/**
* Snapshot of data that are user-related.
case class CreditSnapshot(creditAmount: Double, snapshotTime: Long) extends DataSnapshot
-case class RolesSnapshot(roles: List[String], snapshotTime: Long) extends DataSnapshot
+case class IMStateSnapshot(imEvent: IMEventModel, snapshotTime: Long) extends DataSnapshot
+
+//case class RolesSnapshot(roles: List[String], snapshotTime: Long) extends DataSnapshot
/**
* Represents an agreement valid for a specific amount of time. By convention,
* - If the resource is complex, the (name, instanceId) is (DSLResource.name, instance-id)
* - If the resource is simple, the (name, instanceId) is (DSLResource.name, "1")
*
- * @param resource Same as `resource` of [[gr.grnet.aquarium.event.ResourceEvent]]
- * @param instanceId Same as `instanceId` of [[gr.grnet.aquarium.event.ResourceEvent]]
+ * @param resource Same as `resource` of [[gr.grnet.aquarium.event.resource.ResourceEventModel]]
+ * @param instanceId Same as `instanceId` of [[gr.grnet.aquarium.event.resource.ResourceEventModel]]
* @param instanceAmount This is the amount kept for the resource instance.
* The general rule is that an amount saved in a [[gr.grnet.aquarium.user.ResourceInstanceSnapshot]]
- * represents a total value, while a value appearing in a [[gr.grnet.aquarium.event.ResourceEvent]]
+ * represents a total value, while a value appearing in a [[gr.grnet.aquarium.event.resource.ResourceEventModel]]
* represents a difference. How these two values are combined to form the new amount is dictated
* by the underlying [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
* @param snapshotTime
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-case class ActiveStateSnapshot(isActive: Boolean, snapshotTime: Long) extends DataSnapshot
+//case class ActiveStateSnapshot(isActive: Boolean, snapshotTime: Long) extends DataSnapshot
/**
* Keeps the latest resource event per resource instance.
* @param latestResourceEventsSnapshot
* @param billingPeriodResourceEventsCounter
* @param billingPeriodOutOfSyncResourceEventsCounter
- * @param activeStateSnapshot
* @param creditsSnapshot
* @param agreementsSnapshot
- * @param rolesSnapshot
* @param ownedResourcesSnapshot
* @param newWalletEntries
* The wallet entries computed. Not all user states need to holds wallet entries,
* the billing period recorded by `billingPeriodSnapshot`
*/
billingPeriodOutOfSyncResourceEventsCounter: Long,
-
- activeStateSnapshot: ActiveStateSnapshot,
+ imStateSnapshot: IMStateSnapshot,
creditsSnapshot: CreditSnapshot,
agreementsSnapshot: AgreementSnapshot,
- rolesSnapshot: RolesSnapshot,
ownedResourcesSnapshot: OwnedResourcesSnapshot,
newWalletEntries: List[NewWalletEntry],
// The last known change reason for this userState
private[this] def _allSnapshots: List[Long] = {
List(
- activeStateSnapshot.snapshotTime,
- creditsSnapshot.snapshotTime, agreementsSnapshot.snapshotTime, rolesSnapshot.snapshotTime,
+ imStateSnapshot.snapshotTime,
+ creditsSnapshot.snapshotTime, agreementsSnapshot.snapshotTime,
ownedResourcesSnapshot.snapshotTime,
implicitlyIssuedSnapshot.snapshotTime,
latestResourceEventsSnapshot.snapshotTime
def resourcesMap = ownedResourcesSnapshot.toResourcesMap
def modifyFromIMEvent(imEvent: IMEventModel, snapshotMillis: Long): UserState = {
- val changeReason = IMEventArrival(imEvent)
this.copy(
isInitial = false,
- activeStateSnapshot = ActiveStateSnapshot(imEvent.isActive, snapshotMillis),
- rolesSnapshot = RolesSnapshot(List(imEvent.role), snapshotMillis),
- lastChangeReason = changeReason
+ imStateSnapshot = IMStateSnapshot(imEvent, snapshotMillis),
+ lastChangeReason = IMEventArrival(imEvent)
)
}
import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
import gr.grnet.aquarium.logic.accounting.Accounting
import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
-import gr.grnet.aquarium.AquariumException
import gr.grnet.aquarium.event.{NewWalletEntry}
import gr.grnet.aquarium.event.resource.ResourceEventModel
+import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent}
+import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion.User
+import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
/**
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
class UserStateComputations extends Loggable {
- def createInitialUserState(userId: String,
+ def createInitialUserState(imEvent: IMEventModel, credits: Double, agreementName: String) = {
+ if(!imEvent.isCreateUser) {
+ throw new AquariumInternalError(
+ "Got '%s' instead of '%s'".format(imEvent.eventType, IMEventModel.EventTypeNames.create))
+ }
+
+ val userID = imEvent.userID
+ val userCreationMillis = imEvent.occurredMillis
+ val now = TimeHelpers.nowMillis()
+
+ UserState(
+ true,
+ userID,
+ userCreationMillis,
+ 0L,
+ false,
+ null,
+ ImplicitlyIssuedResourceEventsSnapshot(List(), now),
+ Nil,
+ Nil,
+ LatestResourceEventsSnapshot(List(), now),
+ 0L,
+ 0L,
+ IMStateSnapshot(imEvent, now),
+ CreditSnapshot(credits, now),
+ AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
+ OwnedResourcesSnapshot(Nil, now),
+ Nil,
+ InitialUserStateSetup
+ )
+ }
+
+ def createInitialUserState(userID: String,
userCreationMillis: Long,
isActive: Boolean,
credits: Double,
UserState(
true,
- userId,
+ userID,
userCreationMillis,
0L,
false,
LatestResourceEventsSnapshot(List(), now),
0L,
0L,
- ActiveStateSnapshot(isActive, now),
+ IMStateSnapshot(
+ StdIMEvent(
+ "",
+ now, now, userID,
+ "",
+ isActive, roleNames.headOption.getOrElse("default"),
+ "1.0",
+ IMEventModel.EventTypeNames.create, Map()),
+ now
+ ),
CreditSnapshot(credits, now),
AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
- RolesSnapshot(roleNames, now),
OwnedResourcesSnapshot(Nil, now),
Nil,
InitialUserStateSetup
def createInitialUserStateFrom(us: UserState): UserState = {
createInitialUserState(
- us.userID,
- us.userCreationMillis,
- us.activeStateSnapshot.isActive,
+ us.imStateSnapshot.imEvent,
us.creditsSnapshot.creditAmount,
- us.rolesSnapshot.roles,
- us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last
- )
+ us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last)
}
def findUserStateAtEndOfBillingMonth(userId: String,
val UserCKKL = Aquarium.newUser("CKKL", UserCreationDate)
val InitialUserState = Computations.createInitialUserState(
- userId = UserCKKL.userId,
+ userID = UserCKKL.userId,
userCreationMillis = UserCreationDate.getTime,
isActive = true,
credits = 0.0,