2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.AquariumInternalError
41 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
42 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
43 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
44 import gr.grnet.aquarium.actor.message.GetUserBillRequest
45 import gr.grnet.aquarium.actor.message.GetUserBillResponse
46 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
47 import gr.grnet.aquarium.actor.message.GetUserStateRequest
48 import gr.grnet.aquarium.actor.message.GetUserStateResponse
49 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
50 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
51 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
52 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
53 import gr.grnet.aquarium.actor.message.config.InitializeUserActorState
54 import gr.grnet.aquarium.charging.bill.AbstractBillEntry
55 import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
56 import gr.grnet.aquarium.computation.BillingMonthInfo
57 import gr.grnet.aquarium.message.avro.gen.{IMEventMsg, ResourceEventMsg, UserStateMsg}
58 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers}
59 import gr.grnet.aquarium.service.event.BalanceEvent
60 import gr.grnet.aquarium.util.date.TimeHelpers
61 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
65 * @author Christos KK Loverdos <loverdos@gmail.com>
68 class UserActor extends ReflectiveRoleableActor {
69 private[this] var _userID: String = "<?>"
70 private[this] var _userState: UserStateModel = _
71 private[this] var _userCreationIMEvent: IMEventMsg = _
72 private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _
73 private[this] var _latestIMEventOriginalID: String = ""
74 private[this] var _latestResourceEventOriginalID: String = ""
75 private[this] var _userStateBootstrap: UserStateBootstrap = _
79 throw new AquariumInternalError("%s not initialized")
85 override def postStop() {
86 DEBUG("I am finally stopped (in postStop())")
87 aquarium.akkaService.notifyUserActorPostStop(this)
90 private[this] def shutmedown(): Unit = {
92 aquarium.akkaService.invalidateUserActor(this)
96 override protected def onThrowable(t: Throwable, message: AnyRef) = {
97 LogHelpers.logChainOfCauses(logger, t)
98 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
103 def role = UserActorRole
105 private[this] def chargingService = aquarium.chargingService
107 private[this] def stdUserStateStoreFunc = (userState: UserStateMsg) ⇒ {
108 aquarium.userStateStore.insertUserState(userState)
111 @inline private[this] def haveUserID = {
115 @inline private[this] def haveUserCreationIMEvent = {
116 this._userCreationIMEvent ne null
119 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
122 @inline private[this] def haveAgreements = {
123 (this._userAgreementHistoryModel ne null) && this._userAgreementHistoryModel.size > 0
126 @inline private[this] def haveUserState = {
127 this._userState ne null
130 @inline private[this] def haveUserStateBootstrap = {
131 this._userStateBootstrap ne null
134 private[this] def updateAgreementHistoryFrom(imEvent: IMEventMsg): Unit = {
135 val isCreateUser = MessageHelpers.isIMEventCreate(imEvent)
137 if(haveUserCreationIMEvent) {
138 throw new AquariumInternalError(
139 "Got user creation event (id=%s) but I already have one (id=%s)",
140 this._userCreationIMEvent.getOriginalID,
141 imEvent.getOriginalID
145 this._userCreationIMEvent = imEvent
148 val effectiveFromMillis = imEvent.getOccurredMillis
149 val role = imEvent.getRole
150 // calling unsafe just for the side-effect
151 assert(null ne aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis))
153 // add to model (will update the underlying messages as well)
154 if(this._userAgreementHistoryModel eq null) {
155 this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModelFromIMEvent(imEvent, imEvent.getOriginalID)
157 val newUserAgreementModel = ModelFactory.newUserAgreementModelFromIMEvent(imEvent, imEvent.getOriginalID)
158 this._userAgreementHistoryModel += newUserAgreementModel
162 private[this] def updateLatestIMEventIDFrom(imEvent: IMEventMsg): Unit = {
163 this._latestIMEventOriginalID = imEvent.getOriginalID
166 private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventMsg): Unit = {
167 this._latestResourceEventOriginalID = rcEvent.getOriginalID
171 * Creates the initial state that is related to IMEvents.
173 private[this] def initializeStateOfIMEvents(): Unit = {
174 // NOTE: this._userID is already set up by onInitializeUserActorState()
175 aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
176 DEBUG("Replaying %s", imEvent)
178 updateAgreementHistoryFrom(imEvent)
179 updateLatestIMEventIDFrom(imEvent)
183 DEBUG("Initial agreement history %s", this._userAgreementHistoryModel.toJsonString)
189 * Resource events are processed only if the user has been created and has agreements.
190 * Otherwise nothing can be computed.
192 private[this] def shouldProcessResourceEvents: Boolean = {
193 haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
196 private[this] def loadUserStateAndUpdateAgreementHistory(): Unit = {
197 assert(this.haveAgreements, "this.haveAgreements")
198 assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
200 if(!haveUserStateBootstrap) {
201 this._userStateBootstrap = aquarium.getUserStateBootstrap(this._userCreationIMEvent)
203 logger.debug("#### this._userStateBootStrap %s".format(this._userStateBootstrap.toString))
204 val now = TimeHelpers.nowMillis()
205 this._userState = chargingService.replayMonthChargingUpTo(
206 BillingMonthInfo.fromMillis(now),
208 this._userStateBootstrap,
209 aquarium.currentResourceTypesMap,
210 aquarium.userStateStore.insertUserState
213 // Final touch: Update agreement history in the working user state.
214 // The assumption is that all agreement changes go via IMEvents, so the
215 // state this._workingAgreementHistory is always the authoritative source.
217 this._userState.userAgreementHistoryModel = this._userAgreementHistoryModel
218 DEBUG("Computed working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg))
222 private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
223 if(!this.haveAgreements) {
224 DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
228 if(!this.haveUserCreationIMEvent) {
229 DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
233 // We will also need this functionality when receiving IMEvents, so we place it in a method
234 loadUserStateAndUpdateAgreementHistory()
237 DEBUG("Initial working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg))
242 def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
243 this._userID = event.userID
244 DEBUG("Got %s", event)
246 initializeStateOfIMEvents()
247 initializeStateOfResourceEvents(event)
251 * Process [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s.
252 * When this method is called, we assume that all proper checks have been made and it
253 * is OK to proceed with the event processing.
255 def onIMEventMsg(imEvent: IMEventMsg): Unit = {
256 val hadUserCreationIMEvent = haveUserCreationIMEvent
258 if(!haveAgreements) {
259 // This IMEvent has arrived after any ResourceEvents
260 INFO("Arrived after any ResourceEvent: %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent))
261 initializeStateOfIMEvents()
264 if(this._latestIMEventOriginalID == imEvent.getOriginalID) {
265 // This happens when the actor is brought to life, then immediately initialized, and then
266 // sent the first IM event. But from the initialization procedure, this IM event will have
267 // already been loaded from DB!
268 INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent))
271 //this._latestIMEventID = imEvent.id
275 updateAgreementHistoryFrom(imEvent)
276 updateLatestIMEventIDFrom(imEvent)
279 // Must also update user state if we know when in history the life of a user begins
280 if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
281 INFO("Processing user state, since we had a CREATE IMEvent")
282 loadUserStateAndUpdateAgreementHistory()
288 def onResourceEventMsg(rcEvent: ResourceEventMsg): Unit = {
289 if(!shouldProcessResourceEvents) {
290 // This means the user has not been created (at least, as far as Aquarium is concerned).
291 // So, we do not process any resource event
292 DEBUG("Not processing %s", AvroHelpers.jsonStringOfSpecificRecord(rcEvent))
298 // Since the latest resource event per resource is recorded in the user state,
299 // we do not need to query the store. Just query the in-memory state.
300 // Note: This is a similar situation with the first IMEvent received right after the user
302 if(this._latestResourceEventOriginalID == rcEvent.getOriginalID) {
303 INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(rcEvent))
309 val now = TimeHelpers.nowMillis()
310 // TODO: Review this and its usage in user state.
311 // TODO: The assumption is that the resource set increases all the time,
312 // TODO: so the current map contains everything ever known (assuming we do not run backwards in time).
313 val currentResourcesMap = aquarium.currentResourceTypesMap
315 val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
316 val nowYear = nowBillingMonthInfo.year
317 val nowMonth = nowBillingMonthInfo.month
319 val eventOccurredMillis = rcEvent.getOccurredMillis
320 val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
321 val eventYear = eventBillingMonthInfo.year
322 val eventMonth = eventBillingMonthInfo.month
324 def computeBatch(): Unit = {
325 DEBUG("Going for out of sync charging")
326 this._userState = chargingService.replayMonthChargingUpTo(
328 // Take into account that the event may be out-of-sync.
329 // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
330 now max eventOccurredMillis,
331 this._userStateBootstrap,
333 stdUserStateStoreFunc
336 updateLatestResourceEventIDFrom(rcEvent)
339 def computeRealtime(): Unit = {
340 DEBUG("Going for in sync charging")
341 chargingService.processResourceEvent(
348 updateLatestResourceEventIDFrom(rcEvent)
351 val oldTotalCredits =
352 if(this._userState!=null)
353 this._userState.totalCredits
357 if(this._userState eq null) {
360 else if(nowYear != eventYear || nowMonth != eventMonth) {
362 "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
368 else if(this._userState.latestResourceEventOccurredMillis < rcEvent.getOccurredMillis) {
369 DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
372 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis),
373 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis)
378 DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s and rcEvent.occurredMillis=%s",
379 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis),
380 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis))
384 val newTotalCredits = this._userState.totalCredits
385 if(oldTotalCredits * newTotalCredits < 0)
386 aquarium.eventBus ! new BalanceEvent(this._userState.userID,
388 DEBUG("Updated %s", this._userState)
392 def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
394 val timeslot = event.timeslot
395 val state= if(haveUserState) Some(this._userState.msg) else None
396 val billEntry = AbstractBillEntry.fromWorkingUserState(timeslot,this._userID,state)
397 val billData = GetUserBillResponseData(this._userID,billEntry)
398 sender ! GetUserBillResponse(Right(billData))
402 sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
406 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
407 val userID = event.userID
409 (haveUserCreationIMEvent, haveUserState) match {
411 // (User CREATEd, with balance state)
412 val realtimeMillis = TimeHelpers.nowMillis()
413 chargingService.calculateRealtimeUserState(
415 BillingMonthInfo.fromMillis(realtimeMillis),
419 sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._userState.totalCredits)))
422 // (User CREATEd, no balance state)
423 // Return the default initial balance
424 sender ! GetUserBalanceResponse(
426 GetUserBalanceResponseData(
428 aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis)
432 // (Not CREATEd, with balance state)
433 // Clearly this is internal error
434 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
436 case (false, false) ⇒
437 // (Not CREATEd, no balance state)
438 // The user is completely unknown
439 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
443 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
444 haveUserState match {
446 val realtimeMillis = TimeHelpers.nowMillis()
447 chargingService.calculateRealtimeUserState(
449 BillingMonthInfo.fromMillis(realtimeMillis),
453 sender ! GetUserStateResponse(Right(this._userState.msg))
456 sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
460 def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
461 haveUserState match {
463 DEBUG("haveWorkingUserState: %s", event)
464 val realtimeMillis = TimeHelpers.nowMillis()
465 chargingService.calculateRealtimeUserState(
467 BillingMonthInfo.fromMillis(realtimeMillis),
471 sender ! GetUserWalletResponse(
473 GetUserWalletResponseData(
475 this._userState.totalCredits,
476 MessageFactory.newWalletEntriesMsg(this._userState.msg.getWalletEntries)
480 DEBUG("!haveWorkingUserState: %s", event)
481 haveUserCreationIMEvent match {
483 DEBUG("haveUserCreationIMEvent: %s", event)
484 sender ! GetUserWalletResponse(
486 GetUserWalletResponseData(
488 aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis),
489 MessageFactory.newWalletEntriesMsg()
493 DEBUG("!haveUserCreationIMEvent: %s", event)
494 sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
499 private[this] def D_userID = {
503 private[this] def DEBUG(fmt: String, args: Any*) =
504 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
506 private[this] def INFO(fmt: String, args: Any*) =
507 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
509 private[this] def WARN(fmt: String, args: Any*) =
510 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
512 private[this] def ERROR(fmt: String, args: Any*) =
513 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
515 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
516 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)