2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.util.date.TimeHelpers
41 import gr.grnet.aquarium.service.event.BalanceEvent
42 import gr.grnet.aquarium.event.model.im.IMEventModel
43 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
44 import gr.grnet.aquarium.actor.message.config.InitializeUserActorState
45 import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
46 import gr.grnet.aquarium.actor.message.event.ProcessResourceEvent
47 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
48 import gr.grnet.aquarium.AquariumInternalError
49 import gr.grnet.aquarium.computation.BillingMonthInfo
50 import gr.grnet.aquarium.charging.state.{WorkingUserState, UserStateModel}
51 import gr.grnet.aquarium.policy.PolicyDefinedFullPriceTableRef
52 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
53 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
54 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
55 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
56 import gr.grnet.aquarium.actor.message.GetUserStateRequest
57 import gr.grnet.aquarium.actor.message.GetUserStateResponse
58 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
59 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
60 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
61 import gr.grnet.aquarium.actor.message.GetUserBillRequest
62 import gr.grnet.aquarium.actor.message.GetUserBillResponse
63 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
64 import gr.grnet.aquarium.charging.state.WorkingAgreementHistory
65 import gr.grnet.aquarium.policy.StdUserAgreement
66 import gr.grnet.aquarium.charging.state.UserStateBootstrap
67 import gr.grnet.aquarium.charging.bill.BillEntry
71 * @author Christos KK Loverdos <loverdos@gmail.com>
74 class UserActor extends ReflectiveRoleableActor {
75 private[this] var _userID: String = "<?>"
76 private[this] var _workingUserState: WorkingUserState = _
77 private[this] var _userCreationIMEvent: IMEventModel = _
78 private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory
79 private[this] var _latestIMEventID: String = ""
80 private[this] var _latestResourceEventID: String = ""
81 private[this] var _userStateBootstrap: UserStateBootstrap = _
85 throw new AquariumInternalError("%s not initialized")
91 override def postStop() {
92 DEBUG("I am finally stopped (in postStop())")
93 aquarium.akkaService.notifyUserActorPostStop(this)
96 private[this] def shutmedown(): Unit = {
98 aquarium.akkaService.invalidateUserActor(this)
102 override protected def onThrowable(t: Throwable, message: AnyRef) = {
103 LogHelpers.logChainOfCauses(logger, t)
104 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
109 def role = UserActorRole
111 private[this] def chargingService = aquarium.chargingService
113 private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ {
114 aquarium.userStateStore.insertUserState(userState)
117 @inline private[this] def haveUserID = {
121 @inline private[this] def haveUserCreationIMEvent = {
122 this._userCreationIMEvent ne null
125 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
128 @inline private[this] def haveAgreements = {
129 this._workingAgreementHistory.size > 0
132 @inline private[this] def haveWorkingUserState = {
133 this._workingUserState ne null
136 @inline private[this] def haveUserStateBootstrap = {
137 this._userStateBootstrap ne null
140 private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = {
141 if(imEvent.isCreateUser) {
142 if(haveUserCreationIMEvent) {
143 throw new AquariumInternalError(
144 "Got user creation event (id=%s) but I already have one (id=%s)",
145 this._userCreationIMEvent.id,
150 this._userCreationIMEvent = imEvent
153 val effectiveFromMillis = imEvent.occurredMillis
154 val role = imEvent.role
155 // calling unsafe just for the side-effect
156 assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis))
158 val newAgreement = StdUserAgreement(
164 PolicyDefinedFullPriceTableRef()
167 this._workingAgreementHistory += newAgreement
170 private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = {
171 this._latestIMEventID = imEvent.id
174 private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventModel): Unit = {
175 this._latestResourceEventID = rcEvent.id
179 * Creates the initial state that is related to IMEvents.
181 private[this] def initializeStateOfIMEvents(): Unit = {
182 // NOTE: this._userID is already set up by onInitializeUserActorState()
183 aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
184 DEBUG("Replaying %s", imEvent)
186 updateAgreementHistoryFrom(imEvent)
187 updateLatestIMEventIDFrom(imEvent)
191 DEBUG("Initial agreement history %s", this._workingAgreementHistory.toJsonString)
197 * Resource events are processed only if the user has been created and has agreements.
198 * Otherwise nothing can be computed.
200 private[this] def shouldProcessResourceEvents: Boolean = {
201 haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
204 private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = {
205 assert(this.haveAgreements, "this.haveAgreements")
206 assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
208 val userCreationMillis = this._userCreationIMEvent.occurredMillis
209 val userCreationRole = this._userCreationIMEvent.role // initial role
210 val userCreationIMEventID = this._userCreationIMEvent.id
212 if(!haveUserStateBootstrap) {
213 this._userStateBootstrap = UserStateBootstrap(
216 aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)),
217 aquarium.initialUserBalance(userCreationRole, userCreationMillis)
221 val now = TimeHelpers.nowMillis()
222 this._workingUserState = chargingService.replayMonthChargingUpTo(
223 BillingMonthInfo.fromMillis(now),
225 this._userStateBootstrap,
226 aquarium.currentResourceTypesMap,
227 aquarium.userStateStore.insertUserState
230 // Final touch: Update agreement history in the working user state.
231 // The assumption is that all agreement changes go via IMEvents, so the
232 // state this._workingAgreementHistory is always the authoritative source.
233 if(haveWorkingUserState) {
234 this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory)
235 DEBUG("Computed working user state %s", this._workingUserState.toJsonString)
239 private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
240 if(!this.haveAgreements) {
241 DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
245 if(!this.haveUserCreationIMEvent) {
246 DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
250 // We will also need this functionality when receiving IMEvents, so we place it in a method
251 loadWorkingUserStateAndUpdateAgreementHistory()
253 if(haveWorkingUserState) {
254 DEBUG("Initial working user state %s", this._workingUserState.toJsonString)
259 def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
260 this._userID = event.userID
261 DEBUG("Got %s", event)
263 initializeStateOfIMEvents()
264 initializeStateOfResourceEvents(event)
268 * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
269 * When this method is called, we assume that all proper checks have been made and it
270 * is OK to proceed with the event processing.
272 def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
273 val imEvent = processEvent.imEvent
274 val hadUserCreationIMEvent = haveUserCreationIMEvent
276 if(!haveAgreements) {
277 // This IMEvent has arrived after any ResourceEvents
278 INFO("Arrived after any ResourceEvent: %s", imEvent.toDebugString)
279 initializeStateOfIMEvents()
282 if(this._latestIMEventID == imEvent.id) {
283 // This happens when the actor is brought to life, then immediately initialized, and then
284 // sent the first IM event. But from the initialization procedure, this IM event will have
285 // already been loaded from DB!
286 INFO("Ignoring first %s", imEvent.toDebugString)
289 //this._latestIMEventID = imEvent.id
292 if(imEvent.isAddCredits) {
293 if(!hadUserCreationIMEvent && haveUserCreationIMEvent)
294 loadWorkingUserStateAndUpdateAgreementHistory()
295 onHandleAddCreditsEvent(imEvent)
298 updateAgreementHistoryFrom(imEvent)
299 updateLatestIMEventIDFrom(imEvent)
303 // Must also update user state if we know when in history the life of a user begins
304 if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
305 INFO("Processing user state, since we had a CREATE IMEvent")
306 loadWorkingUserStateAndUpdateAgreementHistory()
312 /* Convert astakos message for adding credits
313 to a regular RESOURCE message */
314 def onHandleAddCreditsEvent(imEvent : IMEventModel) = {
315 val credits = imEvent.details(IMEventModel.DetailsNames.credits).toInt.toDouble
316 val event = new StdResourceEvent(
318 imEvent.occurredMillis,
319 imEvent.receivedMillis,
325 imEvent.eventVersion,
328 //Console.err.println("Event: " + event)
329 //Console.err.println("Total credits before: " + _workingUserState.totalCredits)
330 onProcessResourceEvent(new ProcessResourceEvent(event))
331 //Console.err.println("Total credits after: " + _workingUserState.totalCredits)
332 //Console.err.println("OK.")
335 def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
336 val rcEvent = event.rcEvent
338 if(!shouldProcessResourceEvents) {
339 // This means the user has not been created (at least, as far as Aquarium is concerned).
340 // So, we do not process any resource event
341 DEBUG("Not processing %s", rcEvent.toJsonString)
347 // Since the latest resource event per resource is recorded in the user state,
348 // we do not need to query the store. Just query the in-memory state.
349 // Note: This is a similar situation with the first IMEvent received right after the user
351 if(this._latestResourceEventID == rcEvent.id) {
352 INFO("Ignoring first %s", rcEvent.toDebugString)
358 val now = TimeHelpers.nowMillis()
359 // TODO: Review this and its usage in user state.
360 // TODO: The assumption is that the resource set increases all the time,
361 // TODO: so the current map contains everything ever known (assuming we do not run backwards in time).
362 val currentResourcesMap = aquarium.currentResourceTypesMap
364 val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
365 val nowYear = nowBillingMonthInfo.year
366 val nowMonth = nowBillingMonthInfo.month
368 val eventOccurredMillis = rcEvent.occurredMillis
369 val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
370 val eventYear = eventBillingMonthInfo.year
371 val eventMonth = eventBillingMonthInfo.month
373 def computeBatch(): Unit = {
374 DEBUG("Going for out of sync charging")
375 this._workingUserState = chargingService.replayMonthChargingUpTo(
377 // Take into account that the event may be out-of-sync.
378 // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
379 now max eventOccurredMillis,
380 this._userStateBootstrap,
382 stdUserStateStoreFunc
385 updateLatestResourceEventIDFrom(rcEvent)
388 def computeRealtime(): Unit = {
389 DEBUG("Going for in sync charging")
390 chargingService.processResourceEvent(
392 this._workingUserState,
397 updateLatestResourceEventIDFrom(rcEvent)
400 val oldTotalCredits = this._workingUserState.totalCredits
402 if(nowYear != eventYear || nowMonth != eventMonth) {
404 "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
410 else if(this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis) {
411 DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
414 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._workingUserState.latestResourceEventOccurredMillis),
415 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.occurredMillis)
422 val newTotalCredits = this._workingUserState.totalCredits
423 if(oldTotalCredits * newTotalCredits < 0)
424 aquarium.eventBus ! new BalanceEvent(this._workingUserState.userID,
426 DEBUG("Updated %s", this._workingUserState)
430 def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
432 val timeslot = event.timeslot
433 val state= if(haveWorkingUserState) Some(this._workingUserState) else None
434 val billEntry = BillEntry.fromWorkingUserState(timeslot,this._userID,state)
435 val billData = GetUserBillResponseData(this._userID,billEntry)
436 sender ! GetUserBillResponse(Right(billData))
440 sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
444 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
445 val userID = event.userID
447 (haveUserCreationIMEvent, haveWorkingUserState) match {
449 // (User CREATEd, with balance state)
450 sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits)))
453 // (User CREATEd, no balance state)
454 // Return the default initial balance
455 sender ! GetUserBalanceResponse(
457 GetUserBalanceResponseData(
459 aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis)
463 // (Not CREATEd, with balance state)
464 // Clearly this is internal error
465 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
467 case (false, false) ⇒
468 // (Not CREATEd, no balance state)
469 // The user is completely unknown
470 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
474 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
475 haveWorkingUserState match {
477 sender ! GetUserStateResponse(Right(this._workingUserState))
480 sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
484 def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
485 haveWorkingUserState match {
487 DEBUG("haveWorkingUserState: %s", event)
488 sender ! GetUserWalletResponse(
490 GetUserWalletResponseData(
492 this._workingUserState.totalCredits,
493 this._workingUserState.walletEntries.toList
497 DEBUG("!haveWorkingUserState: %s", event)
498 haveUserCreationIMEvent match {
500 DEBUG("haveUserCreationIMEvent: %s", event)
501 sender ! GetUserWalletResponse(
503 GetUserWalletResponseData(
505 aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis),
510 DEBUG("!haveUserCreationIMEvent: %s", event)
511 sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
516 private[this] def D_userID = {
520 private[this] def DEBUG(fmt: String, args: Any*) =
521 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
523 private[this] def INFO(fmt: String, args: Any*) =
524 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
526 private[this] def WARN(fmt: String, args: Any*) =
527 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
529 private[this] def ERROR(fmt: String, args: Any*) =
530 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
532 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
533 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)