import gr.grnet.aquarium.actor.message.GetUserWalletResponse
import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
-import gr.grnet.aquarium.actor.message.config.InitializeUserActorState
import gr.grnet.aquarium.charging.bill.AbstractBillEntry
import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
import gr.grnet.aquarium.computation.BillingMonthInfo
*/
class UserActor extends ReflectiveRoleableActor {
+ private[this] var _isFirstMessage = true
+ private[this] var _rcMsgCount = 0
+ private[this] var _imMsgCount = 0
private[this] var _userID: String = "<?>"
private[this] var _userState: UserStateModel = _
private[this] var _userCreationIMEvent: IMEventMsg = _
* Creates the initial state that is related to IMEvents.
*/
private[this] def initializeStateOfIMEvents(): Unit = {
- // NOTE: this._userID is already set up by onInitializeUserActorState()
+ // NOTE: this._userID is already set up our caller
aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
DEBUG("Replaying %s", imEvent)
updateAgreementHistoryFrom(imEvent)
updateLatestIMEventIDFrom(imEvent)
}
-
- if(haveAgreements) {
- DEBUG("Initial agreement history %s", this._userAgreementHistoryModel.toJsonString)
- logSeparator()
- }
}
/**
}
}
- private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
+ private[this] def initializeStateOfResourceEvents(): Unit = {
if(!this.haveAgreements) {
- DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
+ DEBUG("No agreements to initialize resources state")
return
}
if(!this.haveUserCreationIMEvent) {
- DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
+ DEBUG("No CREATE IMEvent to initialize resources state")
return
}
}
}
- def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
- this._userID = event.userID
- DEBUG("Got %s", event)
+ /**
+ * Initializes the actor state from DB.
+ */
+ def initializeUserActorState(userID: String): Unit = {
+ this._userID = userID
+
+ DEBUG("Initializing user actor state")
initializeStateOfIMEvents()
- initializeStateOfResourceEvents(event)
+ initializeStateOfResourceEvents()
}
/**
* When this method is called, we assume that all proper checks have been made and it
* is OK to proceed with the event processing.
*/
- def onIMEventMsg(imEvent: IMEventMsg): Unit = {
+ def onIMEventMsg(imEvent: IMEventMsg) {
+ if(this._isFirstMessage) {
+ initializeUserActorState(imEvent.getUserID)
+ // we ignore this event, since it is already saved in the store and all messages in
+ // the store have been consulted by initializeUserActorState()
+ this._isFirstMessage = false
+ return
+ }
+
val hadUserCreationIMEvent = haveUserCreationIMEvent
if(!haveAgreements) {
- // This IMEvent has arrived after any ResourceEvents
- INFO("Arrived after any ResourceEvent: %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent))
initializeStateOfIMEvents()
}
else {
- if(this._latestIMEventOriginalID == imEvent.getOriginalID) {
- // This happens when the actor is brought to life, then immediately initialized, and then
- // sent the first IM event. But from the initialization procedure, this IM event will have
- // already been loaded from DB!
- INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent))
- logSeparator()
-
- //this._latestIMEventID = imEvent.id
- return
- }
-
updateAgreementHistoryFrom(imEvent)
updateLatestIMEventIDFrom(imEvent)
}
logSeparator()
}
- def onResourceEventMsg(rcEvent: ResourceEventMsg): Unit = {
+ def onResourceEventMsg(rcEvent: ResourceEventMsg) {
+ if(this._isFirstMessage) {
+ initializeUserActorState(rcEvent.getUserID)
+ // we ignore this event, since it is already saved in the store and all messages in
+ // the store have been consulted by initializeUserActorState()
+ this._isFirstMessage = false
+ return
+ }
+
if(!shouldProcessResourceEvents) {
// This means the user has not been created (at least, as far as Aquarium is concerned).
// So, we do not process any resource event
return
}
- // Since the latest resource event per resource is recorded in the user state,
- // we do not need to query the store. Just query the in-memory state.
- // Note: This is a similar situation with the first IMEvent received right after the user
- // actor is created.
- if(this._latestResourceEventOriginalID == rcEvent.getOriginalID) {
- INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(rcEvent))
- logSeparator()
-
- return
- }
-
val now = TimeHelpers.nowMillis()
// TODO: Review this and its usage in user state.
// TODO: The assumption is that the resource set increases all the time,