import gr.grnet.aquarium.actor._
-import akka.config.Supervision.Temporary
-import gr.grnet.aquarium.Aquarium
-import gr.grnet.aquarium.util.{shortClassNameOf, shortNameOfClass}
import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
-import gr.grnet.aquarium.computation.data.IMStateSnapshot
+import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded}
+import gr.grnet.aquarium.util.date.TimeHelpers
+import gr.grnet.aquarium.service.event.BalanceEvent
import gr.grnet.aquarium.event.model.im.IMEventModel
-import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
-import gr.grnet.aquarium.computation.NewUserState
-import gr.grnet.aquarium.actor.message.{GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
+import message._
+import config.AquariumPropertiesLoaded
+import config.InitializeUserActorState
+import event.ProcessIMEvent
+import event.ProcessResourceEvent
+import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
+import gr.grnet.aquarium.{Aquarium, AquariumInternalError}
+import gr.grnet.aquarium.computation.BillingMonthInfo
+import gr.grnet.aquarium.charging.state.UserStateBootstrap
+import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel}
+import gr.grnet.aquarium.charging.reason.{InitialUserActorSetup, RealtimeChargingReason}
+import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement}
+import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
+import message.GetUserBalanceRequest
+import message.GetUserBalanceResponse
+import message.GetUserBalanceResponseData
+import message.GetUserStateRequest
+import message.GetUserStateResponse
+import message.GetUserWalletRequest
+import message.GetUserWalletResponse
+import message.GetUserWalletResponseData
+import scala.Left
+import gr.grnet.aquarium.charging.state.WorkingAgreementHistory
+import scala.Some
+import scala.Right
+import gr.grnet.aquarium.policy.StdUserAgreement
+import gr.grnet.aquarium.charging.state.UserStateBootstrap
+import gr.grnet.aquarium.charging.bill.BillEntry
/**
*
class UserActor extends ReflectiveRoleableActor {
private[this] var _userID: String = "<?>"
- private[this] var _imState: IMStateSnapshot = _
-// private[this] var _userState: UserState = _
- private[this] var _newUserState: NewUserState = _
+ private[this] var _workingUserState: WorkingUserState = _
+ private[this] var _userCreationIMEvent: IMEventModel = _
+ private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory
+ private[this] var _latestIMEventID: String = ""
+ private[this] var _latestResourceEventID: String = ""
+ private[this] var _userStateBootstrap: UserStateBootstrap = _
+
+ def unsafeUserID = {
+ if(!haveUserID) {
+ throw new AquariumInternalError("%s not initialized")
+ }
- self.lifeCycle = Temporary
+ this._userID
+ }
- private[this] def _shutmedown(): Unit = {
- if(_haveUserState) {
- UserActorCache.invalidate(_userID)
- }
+ override def postStop() {
+ DEBUG("I am finally stopped (in postStop())")
+ aquarium.akkaService.notifyUserActorPostStop(this)
+ }
- self.stop()
+ private[this] def shutmedown(): Unit = {
+ if(haveUserID) {
+ aquarium.akkaService.invalidateUserActor(this)
+ }
}
override protected def onThrowable(t: Throwable, message: AnyRef) = {
- logChainOfCauses(t)
+ LogHelpers.logChainOfCauses(logger, t)
ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
- _shutmedown()
+ shutmedown()
}
def role = UserActorRole
- private[this] def aquarium: Aquarium = Aquarium.Instance
-
- private[this] def _timestampTheshold =
- aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000)
+ private[this] def chargingService = aquarium.chargingService
+ private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ {
+ aquarium.userStateStore.insertUserState(userState)
+ }
- private[this] def _haveUserState = {
- this._newUserState ne null
+ @inline private[this] def haveUserID = {
+ this._userID ne null
}
- private[this] def _haveIMState = {
- this._imState ne null
+ @inline private[this] def haveUserCreationIMEvent = {
+ this._userCreationIMEvent ne null
}
def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
}
- def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
+ @inline private[this] def haveAgreements = {
+ this._workingAgreementHistory.size > 0
}
- private[this] def createIMState(userID: String): Unit = {
- val store = aquarium.imEventStore
- // TODO: Optimization: Since IMState only records roles, we should incrementally
- // TODO: built it only for those IMEvents that changed the role.
- store.replayIMEventsInOccurrenceOrder(userID) { imEvent ⇒
- logger.debug("Replaying %s".format(imEvent))
+ @inline private[this] def haveWorkingUserState = {
+ this._workingUserState ne null
+ }
- val newState = this._imState match {
- case null ⇒
- IMStateSnapshot.initial(imEvent)
+ @inline private[this] def haveUserStateBootstrap = {
+ this._userStateBootstrap ne null
+ }
- case currentState ⇒
- currentState.copyWithEvent(imEvent)
+ private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = {
+ if(imEvent.isCreateUser) {
+ if(haveUserCreationIMEvent) {
+ throw new AquariumInternalError(
+ "Got user creation event (id=%s) but I already have one (id=%s)",
+ this._userCreationIMEvent.id,
+ imEvent.id
+ )
}
- this._imState = newState
+ this._userCreationIMEvent = imEvent
}
- DEBUG("Recomputed %s = %s", shortNameOfClass(classOf[IMStateSnapshot]), this._imState)
+ val effectiveFromMillis = imEvent.occurredMillis
+ val role = imEvent.role
+ // calling unsafe just for the side-effect
+ assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis))
+
+ val newAgreement = StdUserAgreement(
+ imEvent.id,
+ Some(imEvent.id),
+ effectiveFromMillis,
+ Long.MaxValue,
+ role,
+ PolicyDefinedFullPriceTableRef
+ )
+
+ this._workingAgreementHistory += newAgreement
}
- private[this] def createUserState(userID: String): Unit = {
+ private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = {
+ this._latestIMEventID = imEvent.id
}
- def onInitializeUserState(event: InitializeUserState): Unit = {
- val userID = event.userID
- this._userID = userID
- DEBUG("Got %s", event)
+ private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventModel): Unit = {
+ this._latestResourceEventID = rcEvent.id
+ }
+
+ /**
+ * Creates the initial state that is related to IMEvents.
+ */
+ private[this] def initializeStateOfIMEvents(): Unit = {
+ // NOTE: this._userID is already set up by onInitializeUserActorState()
+ aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
+ DEBUG("Replaying %s", imEvent)
+
+ updateAgreementHistoryFrom(imEvent)
+ updateLatestIMEventIDFrom(imEvent)
+ }
+
+ if(haveAgreements) {
+ DEBUG("Initial agreement history %s", this._workingAgreementHistory.toJsonString)
+ logSeparator()
+ }
+ }
+
+ /**
+ * Resource events are processed only if the user has been created and has agreements.
+ * Otherwise nothing can be computed.
+ */
+ private[this] def shouldProcessResourceEvents: Boolean = {
+ haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
+ }
+
+ private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = {
+ assert(this.haveAgreements, "this.haveAgreements")
+ assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
+
+ val userCreationMillis = this._userCreationIMEvent.occurredMillis
+ val userCreationRole = this._userCreationIMEvent.role // initial role
+ val userCreationIMEventID = this._userCreationIMEvent.id
+
+ if(!haveUserStateBootstrap) {
+ this._userStateBootstrap = UserStateBootstrap(
+ this._userID,
+ userCreationMillis,
+ aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)),
+ aquarium.initialUserBalance(userCreationRole, userCreationMillis)
+ )
+ }
+
+ val now = TimeHelpers.nowMillis()
+ this._workingUserState = chargingService.replayMonthChargingUpTo(
+ BillingMonthInfo.fromMillis(now),
+ now,
+ this._userStateBootstrap,
+ aquarium.currentResourceTypesMap,
+ InitialUserActorSetup(),
+ aquarium.userStateStore.insertUserState
+ )
+
+ // Final touch: Update agreement history in the working user state.
+ // The assumption is that all agreement changes go via IMEvents, so the
+ // state this._workingAgreementHistory is always the authoritative source.
+ if(haveWorkingUserState) {
+ this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory)
+ DEBUG("Computed working user state %s", this._workingUserState.toJsonString)
+ }
+ }
+
+ private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
+ if(!this.haveAgreements) {
+ DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
+ return
+ }
+
+ if(!this.haveUserCreationIMEvent) {
+ DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
+ return
+ }
+
+ // We will also need this functionality when receiving IMEvents, so we place it in a method
+ loadWorkingUserStateAndUpdateAgreementHistory()
- createIMState(userID)
- createUserState(userID)
+ if(haveWorkingUserState) {
+ DEBUG("Initial working user state %s", this._workingUserState.toJsonString)
+ logSeparator()
+ }
}
- private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = {
- // FIXME: Implement based on the role
- "default"
+ def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
+ this._userID = event.userID
+ DEBUG("Got %s", event)
+
+ initializeStateOfIMEvents()
+ initializeStateOfResourceEvents(event)
}
/**
*/
def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
val imEvent = processEvent.imEvent
+ val hadUserCreationIMEvent = haveUserCreationIMEvent
- if(!_haveIMState) {
- // This is an error. Should have been initialized from somewhere ...
- throw new Exception("Got %s while being uninitialized".format(processEvent))
+ if(!haveAgreements) {
+ // This IMEvent has arrived after any ResourceEvents
+ INFO("Arrived after any ResourceEvent: %s", imEvent.toDebugString)
+ initializeStateOfIMEvents()
+ }
+ else {
+ if(this._latestIMEventID == imEvent.id) {
+ // This happens when the actor is brought to life, then immediately initialized, and then
+ // sent the first IM event. But from the initialization procedure, this IM event will have
+ // already been loaded from DB!
+ INFO("Ignoring first %s", imEvent.toDebugString)
+ logSeparator()
+
+ //this._latestIMEventID = imEvent.id
+ return
+ }
+ if(imEvent.isAddCredits) {
+ if(!hadUserCreationIMEvent && haveUserCreationIMEvent)
+ loadWorkingUserStateAndUpdateAgreementHistory()
+ onHandleAddCreditsEvent(imEvent)
+
+ } else {
+ updateAgreementHistoryFrom(imEvent)
+ updateLatestIMEventIDFrom(imEvent)
+ }
}
- if(this._imState.latestIMEvent.id == imEvent.id) {
- // This happens when the actor is brought to life, then immediately initialized, and then
- // sent the first IM event. But from the initialization procedure, this IM event will have
- // already been loaded from DB!
- DEBUG("Ignoring first %s after birth", imEvent.toDebugString)
- return
+ // Must also update user state if we know when in history the life of a user begins
+ if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
+ INFO("Processing user state, since we had a CREATE IMEvent")
+ loadWorkingUserStateAndUpdateAgreementHistory()
}
- this._imState = this._imState.copyWithEvent(imEvent)
+ logSeparator()
+ }
- DEBUG("Update %s = %s", shortClassNameOf(this._imState), this._imState)
+ /* Convert astakos message for adding credits
+ to a regular RESOURCE message */
+ def onHandleAddCreditsEvent(imEvent : IMEventModel) = {
+ val credits = imEvent.details(IMEventModel.DetailsNames.credits).toInt.toDouble
+ val event = new StdResourceEvent(
+ imEvent.id,
+ imEvent.occurredMillis,
+ imEvent.receivedMillis,
+ imEvent.userID,
+ imEvent.clientID,
+ imEvent.eventType,
+ imEvent.eventType,
+ credits,
+ imEvent.eventVersion,
+ imEvent.details
+ )
+ //Console.err.println("Event: " + event)
+ //Console.err.println("Total credits before: " + _workingUserState.totalCredits)
+ onProcessResourceEvent(new ProcessResourceEvent(event))
+ //Console.err.println("Total credits after: " + _workingUserState.totalCredits)
+ //Console.err.println("OK.")
}
def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
val rcEvent = event.rcEvent
- if(!_haveIMState) {
- // This means the user has not been activated. So, we do not process any resource event
- INFO("Not processing %s", rcEvent.toJsonString)
+ if(!shouldProcessResourceEvents) {
+ // This means the user has not been created (at least, as far as Aquarium is concerned).
+ // So, we do not process any resource event
+ DEBUG("Not processing %s", rcEvent.toJsonString)
+ logSeparator()
+
+ return
+ }
+
+ // Since the latest resource event per resource is recorded in the user state,
+ // we do not need to query the store. Just query the in-memory state.
+ // Note: This is a similar situation with the first IMEvent received right after the user
+ // actor is created.
+ if(this._latestResourceEventID == rcEvent.id) {
+ INFO("Ignoring first %s", rcEvent.toDebugString)
+ logSeparator()
+
return
}
+
+ val now = TimeHelpers.nowMillis()
+ val currentResourcesMap = aquarium.currentResourceTypesMap
+ val chargingReason = RealtimeChargingReason(None, now)
+
+ val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
+ val nowYear = nowBillingMonthInfo.year
+ val nowMonth = nowBillingMonthInfo.month
+
+ val eventOccurredMillis = rcEvent.occurredMillis
+ val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
+ val eventYear = eventBillingMonthInfo.year
+ val eventMonth = eventBillingMonthInfo.month
+
+ def computeBatch(): Unit = {
+ DEBUG("Going for out of sync charging")
+ this._workingUserState = chargingService.replayMonthChargingUpTo(
+ nowBillingMonthInfo,
+ // Take into account that the event may be out-of-sync.
+ // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
+ now max eventOccurredMillis,
+ this._userStateBootstrap,
+ currentResourcesMap,
+ chargingReason,
+ stdUserStateStoreFunc
+ )
+
+ updateLatestResourceEventIDFrom(rcEvent)
+ }
+
+ def computeRealtime(): Unit = {
+ DEBUG("Going for in sync charging")
+ chargingService.processResourceEvent(
+ rcEvent,
+ this._workingUserState,
+ chargingReason,
+ nowBillingMonthInfo,
+ true
+ )
+
+ updateLatestResourceEventIDFrom(rcEvent)
+ }
+
+ var oldTotalCredits = this._workingUserState.totalCredits
+ // FIXME check these
+ if(nowYear != eventYear || nowMonth != eventMonth) {
+ DEBUG(
+ "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
+ nowYear, eventYear,
+ nowMonth, eventMonth
+ )
+ computeBatch()
+ }
+ else if(this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis) {
+ DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
+ DEBUG(
+ "%s < %s",
+ TimeHelpers.toYYYYMMDDHHMMSSSSS(this._workingUserState.latestResourceEventOccurredMillis),
+ TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.occurredMillis)
+ )
+ computeRealtime()
+ }
+ else {
+ computeBatch()
+ }
+ var newTotal = this._workingUserState.totalCredits
+ if(oldTotalCredits * newTotal < 0)
+ aquarium.eventBus ! new BalanceEvent(this._workingUserState.userID,
+ newTotal>=0)
+ DEBUG("Updated %s", this._workingUserState)
+ logSeparator()
}
+ def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
+ try{
+ val timeslot = event.timeslot
+ val state= if(haveWorkingUserState) Some(this._workingUserState) else None
+ val billEntry = BillEntry.fromWorkingUserState(timeslot,this._userID,state)
+ val billData = GetUserBillResponseData(this._userID,billEntry)
+ sender ! GetUserBillResponse(Right(billData))
+ } catch {
+ case e:Exception =>
+ e.printStackTrace()
+ sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
+ }
+ }
def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
- val userId = event.userID
- // FIXME: Implement
-// self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount))
+ val userID = event.userID
+
+ (haveUserCreationIMEvent, haveWorkingUserState) match {
+ case (true, true) ⇒
+ // (User CREATEd, with balance state)
+ sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits)))
+
+ case (true, false) ⇒
+ // (User CREATEd, no balance state)
+ // Return the default initial balance
+ sender ! GetUserBalanceResponse(
+ Right(
+ GetUserBalanceResponseData(
+ this._userID,
+ aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis)
+ )))
+
+ case (false, true) ⇒
+ // (Not CREATEd, with balance state)
+ // Clearly this is internal error
+ sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
+
+ case (false, false) ⇒
+ // (Not CREATEd, no balance state)
+ // The user is completely unknown
+ sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
+ }
}
def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
- val userId = event.userID
- // FIXME: Implement
-// self reply GetUserStateResponse(userId, Right(this._userState))
+ haveWorkingUserState match {
+ case true ⇒
+ sender ! GetUserStateResponse(Right(this._workingUserState))
+
+ case false ⇒
+ sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
+ }
+ }
+
+ def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
+ haveWorkingUserState match {
+ case true ⇒
+ DEBUG("haveWorkingUserState: %s", event)
+ sender ! GetUserWalletResponse(
+ Right(
+ GetUserWalletResponseData(
+ this._userID,
+ this._workingUserState.totalCredits,
+ this._workingUserState.walletEntries.toList
+ )))
+
+ case false ⇒
+ DEBUG("!haveWorkingUserState: %s", event)
+ haveUserCreationIMEvent match {
+ case true ⇒
+ DEBUG("haveUserCreationIMEvent: %s", event)
+ sender ! GetUserWalletResponse(
+ Right(
+ GetUserWalletResponseData(
+ this._userID,
+ aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis),
+ Nil
+ )))
+
+ case false ⇒
+ DEBUG("!haveUserCreationIMEvent: %s", event)
+ sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
+ }
+ }
}
private[this] def D_userID = {
}
private[this] def DEBUG(fmt: String, args: Any*) =
- logger.debug("User[%s]: %s".format(D_userID, fmt.format(args: _*)))
+ logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
private[this] def INFO(fmt: String, args: Any*) =
- logger.info("User[%s]: %s".format(D_userID, fmt.format(args: _*)))
+ logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
private[this] def WARN(fmt: String, args: Any*) =
- logger.warn("User[%s]: %s".format(D_userID, fmt.format(args: _*)))
+ logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
private[this] def ERROR(fmt: String, args: Any*) =
- logger.error("User[%s]: %s".format(D_userID, fmt.format(args: _*)))
+ logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
- logger.error("User[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
+ logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
}