2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.actor._
42 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
43 import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded}
44 import gr.grnet.aquarium.util.date.TimeHelpers
45 import gr.grnet.aquarium.service.event.BalanceEvent
46 import gr.grnet.aquarium.event.model.im.IMEventModel
48 import config.AquariumPropertiesLoaded
49 import config.InitializeUserActorState
50 import event.ProcessIMEvent
51 import event.ProcessResourceEvent
52 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
53 import gr.grnet.aquarium.{Aquarium, AquariumInternalError}
54 import gr.grnet.aquarium.computation.BillingMonthInfo
55 import gr.grnet.aquarium.charging.state.UserStateBootstrap
56 import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel}
57 import gr.grnet.aquarium.charging.reason.{InitialUserActorSetup, RealtimeChargingReason}
58 import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement}
59 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
60 import message.GetUserBalanceRequest
61 import message.GetUserBalanceResponse
62 import message.GetUserBalanceResponseData
63 import message.GetUserStateRequest
64 import message.GetUserStateResponse
65 import message.GetUserWalletRequest
66 import message.GetUserWalletResponse
67 import message.GetUserWalletResponseData
69 import gr.grnet.aquarium.charging.state.WorkingAgreementHistory
72 import gr.grnet.aquarium.policy.StdUserAgreement
73 import gr.grnet.aquarium.charging.state.UserStateBootstrap
74 import gr.grnet.aquarium.charging.bill.{AbstractBillEntry, BillEntry}
78 * @author Christos KK Loverdos <loverdos@gmail.com>
81 class UserActor extends ReflectiveRoleableActor {
82 private[this] var _userID: String = "<?>"
83 private[this] var _workingUserState: WorkingUserState = _
84 private[this] var _userCreationIMEvent: IMEventModel = _
85 private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory
86 private[this] var _latestIMEventID: String = ""
87 private[this] var _latestResourceEventID: String = ""
88 private[this] var _userStateBootstrap: UserStateBootstrap = _
92 throw new AquariumInternalError("%s not initialized")
98 override def postStop() {
99 DEBUG("I am finally stopped (in postStop())")
100 aquarium.akkaService.notifyUserActorPostStop(this)
103 private[this] def shutmedown(): Unit = {
105 aquarium.akkaService.invalidateUserActor(this)
109 override protected def onThrowable(t: Throwable, message: AnyRef) = {
110 LogHelpers.logChainOfCauses(logger, t)
111 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
116 def role = UserActorRole
118 private[this] def chargingService = aquarium.chargingService
120 private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ {
121 aquarium.userStateStore.insertUserState(userState)
124 @inline private[this] def haveUserID = {
128 @inline private[this] def haveUserCreationIMEvent = {
129 this._userCreationIMEvent ne null
132 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
135 @inline private[this] def haveAgreements = {
136 this._workingAgreementHistory.size > 0
139 @inline private[this] def haveWorkingUserState = {
140 this._workingUserState ne null
143 @inline private[this] def haveUserStateBootstrap = {
144 this._userStateBootstrap ne null
147 private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = {
148 if(imEvent.isCreateUser) {
149 if(haveUserCreationIMEvent) {
150 throw new AquariumInternalError(
151 "Got user creation event (id=%s) but I already have one (id=%s)",
152 this._userCreationIMEvent.id,
157 this._userCreationIMEvent = imEvent
160 val effectiveFromMillis = imEvent.occurredMillis
161 val role = imEvent.role
162 // calling unsafe just for the side-effect
163 assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis))
165 val newAgreement = StdUserAgreement(
171 PolicyDefinedFullPriceTableRef
174 this._workingAgreementHistory += newAgreement
177 private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = {
178 this._latestIMEventID = imEvent.id
181 private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventModel): Unit = {
182 this._latestResourceEventID = rcEvent.id
186 * Creates the initial state that is related to IMEvents.
188 private[this] def initializeStateOfIMEvents(): Unit = {
189 // NOTE: this._userID is already set up by onInitializeUserActorState()
190 aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
191 DEBUG("Replaying %s", imEvent)
193 updateAgreementHistoryFrom(imEvent)
194 updateLatestIMEventIDFrom(imEvent)
198 DEBUG("Initial agreement history %s", this._workingAgreementHistory.toJsonString)
204 * Resource events are processed only if the user has been created and has agreements.
205 * Otherwise nothing can be computed.
207 private[this] def shouldProcessResourceEvents: Boolean = {
208 haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
211 private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = {
212 assert(this.haveAgreements, "this.haveAgreements")
213 assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
215 val userCreationMillis = this._userCreationIMEvent.occurredMillis
216 val userCreationRole = this._userCreationIMEvent.role // initial role
217 val userCreationIMEventID = this._userCreationIMEvent.id
219 if(!haveUserStateBootstrap) {
220 this._userStateBootstrap = UserStateBootstrap(
223 aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)),
224 aquarium.initialUserBalance(userCreationRole, userCreationMillis)
228 val now = TimeHelpers.nowMillis()
229 this._workingUserState = chargingService.replayMonthChargingUpTo(
230 BillingMonthInfo.fromMillis(now),
232 this._userStateBootstrap,
233 aquarium.currentResourceTypesMap,
234 InitialUserActorSetup(),
235 aquarium.userStateStore.insertUserState
238 // Final touch: Update agreement history in the working user state.
239 // The assumption is that all agreement changes go via IMEvents, so the
240 // state this._workingAgreementHistory is always the authoritative source.
241 if(haveWorkingUserState) {
242 this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory)
243 DEBUG("Computed working user state %s", this._workingUserState.toJsonString)
247 private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
248 if(!this.haveAgreements) {
249 DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
253 if(!this.haveUserCreationIMEvent) {
254 DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
258 // We will also need this functionality when receiving IMEvents, so we place it in a method
259 loadWorkingUserStateAndUpdateAgreementHistory()
261 if(haveWorkingUserState) {
262 DEBUG("Initial working user state %s", this._workingUserState.toJsonString)
267 def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
268 this._userID = event.userID
269 DEBUG("Got %s", event)
271 initializeStateOfIMEvents()
272 initializeStateOfResourceEvents(event)
276 * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
277 * When this method is called, we assume that all proper checks have been made and it
278 * is OK to proceed with the event processing.
280 def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
281 val imEvent = processEvent.imEvent
282 val hadUserCreationIMEvent = haveUserCreationIMEvent
284 if(!haveAgreements) {
285 // This IMEvent has arrived after any ResourceEvents
286 INFO("Arrived after any ResourceEvent: %s", imEvent.toDebugString)
287 initializeStateOfIMEvents()
290 if(this._latestIMEventID == imEvent.id) {
291 // This happens when the actor is brought to life, then immediately initialized, and then
292 // sent the first IM event. But from the initialization procedure, this IM event will have
293 // already been loaded from DB!
294 INFO("Ignoring first %s", imEvent.toDebugString)
297 //this._latestIMEventID = imEvent.id
300 if(imEvent.isAddCredits) {
301 if(!hadUserCreationIMEvent && haveUserCreationIMEvent)
302 loadWorkingUserStateAndUpdateAgreementHistory()
303 onHandleAddCreditsEvent(imEvent)
306 updateAgreementHistoryFrom(imEvent)
307 updateLatestIMEventIDFrom(imEvent)
311 // Must also update user state if we know when in history the life of a user begins
312 if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
313 INFO("Processing user state, since we had a CREATE IMEvent")
314 loadWorkingUserStateAndUpdateAgreementHistory()
320 /* Convert astakos message for adding credits
321 to a regular RESOURCE message */
322 def onHandleAddCreditsEvent(imEvent : IMEventModel) = {
323 val credits = imEvent.details(IMEventModel.DetailsNames.credits).toInt.toDouble
324 val event = new StdResourceEvent(
326 imEvent.occurredMillis,
327 imEvent.receivedMillis,
333 imEvent.eventVersion,
336 //Console.err.println("Event: " + event)
337 //Console.err.println("Total credits before: " + _workingUserState.totalCredits)
338 onProcessResourceEvent(new ProcessResourceEvent(event))
339 //Console.err.println("Total credits after: " + _workingUserState.totalCredits)
340 //Console.err.println("OK.")
343 def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
344 val rcEvent = event.rcEvent
346 if(!shouldProcessResourceEvents) {
347 // This means the user has not been created (at least, as far as Aquarium is concerned).
348 // So, we do not process any resource event
349 DEBUG("Not processing %s", rcEvent.toJsonString)
355 // Since the latest resource event per resource is recorded in the user state,
356 // we do not need to query the store. Just query the in-memory state.
357 // Note: This is a similar situation with the first IMEvent received right after the user
359 if(this._latestResourceEventID == rcEvent.id) {
360 INFO("Ignoring first %s", rcEvent.toDebugString)
366 val now = TimeHelpers.nowMillis()
367 val currentResourcesMap = aquarium.currentResourceTypesMap
368 val chargingReason = RealtimeChargingReason(None, now)
370 val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
371 val nowYear = nowBillingMonthInfo.year
372 val nowMonth = nowBillingMonthInfo.month
374 val eventOccurredMillis = rcEvent.occurredMillis
375 val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
376 val eventYear = eventBillingMonthInfo.year
377 val eventMonth = eventBillingMonthInfo.month
379 def computeBatch(): Unit = {
380 DEBUG("Going for out of sync charging")
381 this._workingUserState = chargingService.replayMonthChargingUpTo(
383 // Take into account that the event may be out-of-sync.
384 // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
385 now max eventOccurredMillis,
386 this._userStateBootstrap,
389 stdUserStateStoreFunc
392 updateLatestResourceEventIDFrom(rcEvent)
395 def computeRealtime(): Unit = {
396 DEBUG("Going for in sync charging")
397 chargingService.processResourceEvent(
399 this._workingUserState,
405 updateLatestResourceEventIDFrom(rcEvent)
408 val oldTotalCredits = this._workingUserState.totalCredits
410 if(nowYear != eventYear || nowMonth != eventMonth) {
412 "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
418 else if(this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis) {
419 DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
422 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._workingUserState.latestResourceEventOccurredMillis),
423 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.occurredMillis)
430 val newTotalCredits = this._workingUserState.totalCredits
431 if(oldTotalCredits * newTotalCredits < 0)
432 aquarium.eventBus ! new BalanceEvent(this._workingUserState.userID,
434 DEBUG("Updated %s", this._workingUserState)
438 def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
440 val timeslot = event.timeslot
441 val state= if(haveWorkingUserState) Some(this._workingUserState) else None
442 val billEntry = AbstractBillEntry.fromWorkingUserState(timeslot,this._userID,state)
443 val billData = GetUserBillResponseData(this._userID,billEntry)
444 sender ! GetUserBillResponse(Right(billData))
448 sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
452 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
453 val userID = event.userID
455 (haveUserCreationIMEvent, haveWorkingUserState) match {
457 // (User CREATEd, with balance state)
458 sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits)))
461 // (User CREATEd, no balance state)
462 // Return the default initial balance
463 sender ! GetUserBalanceResponse(
465 GetUserBalanceResponseData(
467 aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis)
471 // (Not CREATEd, with balance state)
472 // Clearly this is internal error
473 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
475 case (false, false) ⇒
476 // (Not CREATEd, no balance state)
477 // The user is completely unknown
478 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
482 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
483 haveWorkingUserState match {
485 sender ! GetUserStateResponse(Right(this._workingUserState))
488 sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
492 def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
493 haveWorkingUserState match {
495 DEBUG("haveWorkingUserState: %s", event)
496 sender ! GetUserWalletResponse(
498 GetUserWalletResponseData(
500 this._workingUserState.totalCredits,
501 this._workingUserState.walletEntries.toList
505 DEBUG("!haveWorkingUserState: %s", event)
506 haveUserCreationIMEvent match {
508 DEBUG("haveUserCreationIMEvent: %s", event)
509 sender ! GetUserWalletResponse(
511 GetUserWalletResponseData(
513 aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis),
518 DEBUG("!haveUserCreationIMEvent: %s", event)
519 sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
524 private[this] def D_userID = {
528 private[this] def DEBUG(fmt: String, args: Any*) =
529 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
531 private[this] def INFO(fmt: String, args: Any*) =
532 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
534 private[this] def WARN(fmt: String, args: Any*) =
535 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
537 private[this] def ERROR(fmt: String, args: Any*) =
538 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
540 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
541 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)