2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.util.date.TimeHelpers
41 import gr.grnet.aquarium.service.event.BalanceEvent
42 import gr.grnet.aquarium.event.model.im.IMEventModel
43 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
44 import gr.grnet.aquarium.actor.message.config.InitializeUserActorState
45 import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
46 import gr.grnet.aquarium.actor.message.event.ProcessResourceEvent
47 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
48 import gr.grnet.aquarium.AquariumInternalError
49 import gr.grnet.aquarium.computation.BillingMonthInfo
50 import gr.grnet.aquarium.charging.state.{WorkingUserState, UserStateModel}
51 import gr.grnet.aquarium.policy.PolicyDefinedFullPriceTableRef
52 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
53 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
54 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
55 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
56 import gr.grnet.aquarium.actor.message.GetUserStateRequest
57 import gr.grnet.aquarium.actor.message.GetUserStateResponse
58 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
59 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
60 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
61 import gr.grnet.aquarium.actor.message.GetUserBillRequest
62 import gr.grnet.aquarium.actor.message.GetUserBillResponse
63 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
64 import gr.grnet.aquarium.charging.state.WorkingAgreementHistory
65 import gr.grnet.aquarium.policy.StdUserAgreement
66 import gr.grnet.aquarium.charging.state.UserStateBootstrap
67 import gr.grnet.aquarium.charging.bill.{AbstractBillEntry, BillEntry}
71 * @author Christos KK Loverdos <loverdos@gmail.com>
74 class UserActor extends ReflectiveRoleableActor {
75 private[this] var _userID: String = "<?>"
76 private[this] var _workingUserState: WorkingUserState = _
77 private[this] var _userCreationIMEvent: IMEventModel = _
78 private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory
79 private[this] var _latestIMEventID: String = ""
80 private[this] var _latestResourceEventID: String = ""
81 private[this] var _userStateBootstrap: UserStateBootstrap = _
85 throw new AquariumInternalError("%s not initialized")
91 override def postStop() {
92 DEBUG("I am finally stopped (in postStop())")
93 aquarium.akkaService.notifyUserActorPostStop(this)
96 private[this] def shutmedown(): Unit = {
98 aquarium.akkaService.invalidateUserActor(this)
102 override protected def onThrowable(t: Throwable, message: AnyRef) = {
103 LogHelpers.logChainOfCauses(logger, t)
104 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
109 def role = UserActorRole
111 private[this] def chargingService = aquarium.chargingService
113 private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ {
114 aquarium.userStateStore.insertUserState(userState)
117 @inline private[this] def haveUserID = {
121 @inline private[this] def haveUserCreationIMEvent = {
122 this._userCreationIMEvent ne null
125 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
128 @inline private[this] def haveAgreements = {
129 this._workingAgreementHistory.size > 0
132 @inline private[this] def haveWorkingUserState = {
133 this._workingUserState ne null
136 @inline private[this] def haveUserStateBootstrap = {
137 this._userStateBootstrap ne null
140 private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = {
141 if(imEvent.isCreateUser) {
142 if(haveUserCreationIMEvent) {
143 throw new AquariumInternalError(
144 "Got user creation event (id=%s) but I already have one (id=%s)",
145 this._userCreationIMEvent.id,
150 this._userCreationIMEvent = imEvent
153 val effectiveFromMillis = imEvent.occurredMillis
154 val role = imEvent.role
155 // calling unsafe just for the side-effect
156 assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis))
158 val newAgreement = StdUserAgreement(
164 PolicyDefinedFullPriceTableRef()
167 this._workingAgreementHistory += newAgreement
170 private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = {
171 this._latestIMEventID = imEvent.id
174 private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventModel): Unit = {
175 this._latestResourceEventID = rcEvent.id
179 * Creates the initial state that is related to IMEvents.
181 private[this] def initializeStateOfIMEvents(): Unit = {
182 // NOTE: this._userID is already set up by onInitializeUserActorState()
183 aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
184 DEBUG("Replaying %s", imEvent)
186 updateAgreementHistoryFrom(imEvent)
187 updateLatestIMEventIDFrom(imEvent)
191 DEBUG("Initial agreement history %s", this._workingAgreementHistory.toJsonString)
197 * Resource events are processed only if the user has been created and has agreements.
198 * Otherwise nothing can be computed.
200 private[this] def shouldProcessResourceEvents: Boolean = {
201 haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
204 private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = {
205 assert(this.haveAgreements, "this.haveAgreements")
206 assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
208 val userCreationMillis = this._userCreationIMEvent.occurredMillis
209 val userCreationRole = this._userCreationIMEvent.role // initial role
210 val userCreationIMEventID = this._userCreationIMEvent.id
212 if(!haveUserStateBootstrap) {
213 this._userStateBootstrap = UserStateBootstrap(
216 aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)),
217 aquarium.initialUserBalance(userCreationRole, userCreationMillis)
221 val now = TimeHelpers.nowMillis()
222 this._workingUserState = chargingService.replayMonthChargingUpTo(
223 BillingMonthInfo.fromMillis(now),
225 this._userStateBootstrap,
226 aquarium.currentResourceTypesMap,
227 aquarium.userStateStore.insertUserState
230 // Final touch: Update agreement history in the working user state.
231 // The assumption is that all agreement changes go via IMEvents, so the
232 // state this._workingAgreementHistory is always the authoritative source.
233 if(haveWorkingUserState) {
234 this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory)
235 DEBUG("Computed working user state %s", this._workingUserState.toJsonString)
239 private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
240 if(!this.haveAgreements) {
241 DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
245 if(!this.haveUserCreationIMEvent) {
246 DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
250 // We will also need this functionality when receiving IMEvents, so we place it in a method
251 loadWorkingUserStateAndUpdateAgreementHistory()
253 if(haveWorkingUserState) {
254 DEBUG("Initial working user state %s", this._workingUserState.toJsonString)
259 def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
260 this._userID = event.userID
261 DEBUG("Got %s", event)
263 initializeStateOfIMEvents()
264 initializeStateOfResourceEvents(event)
268 * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
269 * When this method is called, we assume that all proper checks have been made and it
270 * is OK to proceed with the event processing.
272 def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
273 val imEvent = processEvent.imEvent
274 val hadUserCreationIMEvent = haveUserCreationIMEvent
276 if(!haveAgreements) {
277 // This IMEvent has arrived after any ResourceEvents
278 INFO("Arrived after any ResourceEvent: %s", imEvent.toDebugString)
279 initializeStateOfIMEvents()
282 if(this._latestIMEventID == imEvent.id) {
283 // This happens when the actor is brought to life, then immediately initialized, and then
284 // sent the first IM event. But from the initialization procedure, this IM event will have
285 // already been loaded from DB!
286 INFO("Ignoring first %s", imEvent.toDebugString)
289 //this._latestIMEventID = imEvent.id
292 if(imEvent.isAddCredits) {
293 if(!hadUserCreationIMEvent && haveUserCreationIMEvent)
294 loadWorkingUserStateAndUpdateAgreementHistory()
295 onHandleAddCreditsEvent(imEvent)
298 updateAgreementHistoryFrom(imEvent)
299 updateLatestIMEventIDFrom(imEvent)
304 // Must also update user state if we know when in history the life of a user begins
305 if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
306 INFO("Processing user state, since we had a CREATE IMEvent")
307 loadWorkingUserStateAndUpdateAgreementHistory()
313 /* Convert astakos message for adding credits
314 to a regular RESOURCE message */
315 def onHandleAddCreditsEvent(imEvent : IMEventModel) = {
316 DEBUG("Got %s", imEvent.toJsonString)
318 val credits = imEvent.details(IMEventModel.DetailsNames.credits).toInt.toDouble
319 val event = new StdResourceEvent(
321 imEvent.occurredMillis,
322 imEvent.receivedMillis,
328 imEvent.eventVersion,
331 DEBUG("Transformed to %s", event)
332 DEBUG("Total credits before: %s", _workingUserState.totalCredits)
333 aquarium.resourceEventStore.insertResourceEvent(event)
334 onProcessResourceEvent(new ProcessResourceEvent(event))
335 DEBUG("Total credits after: %s", _workingUserState.totalCredits)
336 //Console.err.println("OK.")
339 def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
340 val rcEvent = event.rcEvent
342 if(!shouldProcessResourceEvents) {
343 // This means the user has not been created (at least, as far as Aquarium is concerned).
344 // So, we do not process any resource event
345 DEBUG("Not processing %s", rcEvent.toJsonString)
351 // Since the latest resource event per resource is recorded in the user state,
352 // we do not need to query the store. Just query the in-memory state.
353 // Note: This is a similar situation with the first IMEvent received right after the user
355 if(this._latestResourceEventID == rcEvent.id) {
356 INFO("Ignoring first %s", rcEvent.toDebugString)
362 val now = TimeHelpers.nowMillis()
363 // TODO: Review this and its usage in user state.
364 // TODO: The assumption is that the resource set increases all the time,
365 // TODO: so the current map contains everything ever known (assuming we do not run backwards in time).
366 val currentResourcesMap = aquarium.currentResourceTypesMap
368 val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
369 val nowYear = nowBillingMonthInfo.year
370 val nowMonth = nowBillingMonthInfo.month
372 val eventOccurredMillis = rcEvent.occurredMillis
373 val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
374 val eventYear = eventBillingMonthInfo.year
375 val eventMonth = eventBillingMonthInfo.month
377 def computeBatch(): Unit = {
378 DEBUG("Going for out of sync charging")
379 this._workingUserState = chargingService.replayMonthChargingUpTo(
381 // Take into account that the event may be out-of-sync.
382 // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
383 now max eventOccurredMillis,
384 this._userStateBootstrap,
386 stdUserStateStoreFunc
389 updateLatestResourceEventIDFrom(rcEvent)
392 def computeRealtime(): Unit = {
393 DEBUG("Going for in sync charging")
394 chargingService.processResourceEvent(
396 this._workingUserState,
401 updateLatestResourceEventIDFrom(rcEvent)
404 val oldTotalCredits =
405 if(this._workingUserState!=null)
406 this._workingUserState.totalCredits
410 if(nowYear != eventYear || nowMonth != eventMonth) {
412 "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
418 else if(this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis) {
419 DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
422 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._workingUserState.latestResourceEventOccurredMillis),
423 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.occurredMillis)
428 DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s and rcEvent.occurredMillis=%s",
429 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._workingUserState.latestResourceEventOccurredMillis),
430 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.occurredMillis))
434 val newTotalCredits = this._workingUserState.totalCredits
435 if(oldTotalCredits * newTotalCredits < 0)
436 aquarium.eventBus ! new BalanceEvent(this._workingUserState.userID,
438 DEBUG("Updated %s", this._workingUserState)
442 def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
444 val timeslot = event.timeslot
445 val state= if(haveWorkingUserState) Some(this._workingUserState) else None
446 val billEntry = AbstractBillEntry.fromWorkingUserState(timeslot,this._userID,state)
447 val billData = GetUserBillResponseData(this._userID,billEntry)
448 sender ! GetUserBillResponse(Right(billData))
452 sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
456 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
457 val userID = event.userID
459 (haveUserCreationIMEvent, haveWorkingUserState) match {
461 // (User CREATEd, with balance state)
462 val realtimeMillis = TimeHelpers.nowMillis()
463 chargingService.calculateRealtimeWorkingUserState(
464 this._workingUserState,
465 BillingMonthInfo.fromMillis(realtimeMillis),
469 sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits)))
472 // (User CREATEd, no balance state)
473 // Return the default initial balance
474 sender ! GetUserBalanceResponse(
476 GetUserBalanceResponseData(
478 aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis)
482 // (Not CREATEd, with balance state)
483 // Clearly this is internal error
484 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
486 case (false, false) ⇒
487 // (Not CREATEd, no balance state)
488 // The user is completely unknown
489 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
493 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
494 haveWorkingUserState match {
496 val realtimeMillis = TimeHelpers.nowMillis()
497 chargingService.calculateRealtimeWorkingUserState(
498 this._workingUserState,
499 BillingMonthInfo.fromMillis(realtimeMillis),
503 sender ! GetUserStateResponse(Right(this._workingUserState))
506 sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
510 def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
511 haveWorkingUserState match {
513 DEBUG("haveWorkingUserState: %s", event)
514 val realtimeMillis = TimeHelpers.nowMillis()
515 chargingService.calculateRealtimeWorkingUserState(
516 this._workingUserState,
517 BillingMonthInfo.fromMillis(realtimeMillis),
521 sender ! GetUserWalletResponse(
523 GetUserWalletResponseData(
525 this._workingUserState.totalCredits,
526 this._workingUserState.walletEntries.toList
530 DEBUG("!haveWorkingUserState: %s", event)
531 haveUserCreationIMEvent match {
533 DEBUG("haveUserCreationIMEvent: %s", event)
534 sender ! GetUserWalletResponse(
536 GetUserWalletResponseData(
538 aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis),
543 DEBUG("!haveUserCreationIMEvent: %s", event)
544 sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
549 private[this] def D_userID = {
553 private[this] def DEBUG(fmt: String, args: Any*) =
554 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
556 private[this] def INFO(fmt: String, args: Any*) =
557 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
559 private[this] def WARN(fmt: String, args: Any*) =
560 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
562 private[this] def ERROR(fmt: String, args: Any*) =
563 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
565 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
566 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)