2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.actor._
42 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
43 import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded}
44 import gr.grnet.aquarium.util.date.TimeHelpers
45 import gr.grnet.aquarium.event.model.im.IMEventModel
46 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
47 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
48 import gr.grnet.aquarium.AquariumInternalError
49 import gr.grnet.aquarium.computation.BillingMonthInfo
50 import gr.grnet.aquarium.computation.state.UserStateBootstrap
51 import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel}
52 import gr.grnet.aquarium.charging.reason.{InitialUserActorSetup, RealtimeChargingReason}
53 import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement}
57 * @author Christos KK Loverdos <loverdos@gmail.com>
60 class UserActor extends ReflectiveRoleableActor {
61 private[this] var _userID: String = "<?>"
62 private[this] var _workingUserState: WorkingUserState = _
63 private[this] var _userCreationIMEvent: IMEventModel = _
64 private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory
65 private[this] var _latestIMEventID: String = ""
66 private[this] var _latestResourceEventID: String = ""
67 private[this] var _userStateBootstrap: UserStateBootstrap = _
71 throw new AquariumInternalError("%s not initialized")
77 override def postStop() {
78 DEBUG("I am finally stopped (in postStop())")
79 aquarium.akkaService.notifyUserActorPostStop(this)
82 private[this] def shutmedown(): Unit = {
84 aquarium.akkaService.invalidateUserActor(this)
88 override protected def onThrowable(t: Throwable, message: AnyRef) = {
89 LogHelpers.logChainOfCauses(logger, t)
90 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
95 def role = UserActorRole
97 private[this] def chargingService = aquarium.chargingService
99 private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ {
100 aquarium.userStateStore.insertUserState(userState)
103 @inline private[this] def haveUserID = {
107 @inline private[this] def haveUserCreationIMEvent = {
108 this._userCreationIMEvent ne null
111 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
114 @inline private[this] def haveAgreements = {
115 this._workingAgreementHistory.size > 0
118 @inline private[this] def haveWorkingUserState = {
119 this._workingUserState ne null
122 @inline private[this] def haveUserStateBootstrap = {
123 this._userStateBootstrap ne null
126 private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = {
127 if(imEvent.isCreateUser) {
128 if(haveUserCreationIMEvent) {
129 throw new AquariumInternalError(
130 "Got user creation event (id=%s) but I already have one (id=%s)",
131 this._userCreationIMEvent.id,
136 this._userCreationIMEvent = imEvent
139 val effectiveFromMillis = imEvent.occurredMillis
140 val role = imEvent.role
141 // calling unsafe just for the side-effect
142 assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis))
144 val newAgreement = StdUserAgreement(
150 PolicyDefinedFullPriceTableRef
153 this._workingAgreementHistory += newAgreement
156 private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = {
157 this._latestIMEventID = imEvent.id
161 * Creates the initial state that is related to IMEvents.
163 private[this] def initializeStateOfIMEvents(): Unit = {
164 // NOTE: this._userID is already set up by onInitializeUserActorState()
165 aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
166 DEBUG("Replaying %s", imEvent)
168 updateAgreementHistoryFrom(imEvent)
169 updateLatestIMEventIDFrom(imEvent)
173 DEBUG("Initial %s", this._workingAgreementHistory.toJsonString)
179 * Resource events are processed only if the user has been created and has agreements.
180 * Otherwise nothing can be computed.
182 private[this] def shouldProcessResourceEvents: Boolean = {
183 haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
186 private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = {
187 assert(this.haveAgreements, "this.haveAgreements")
188 assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
190 val userCreationMillis = this._userCreationIMEvent.occurredMillis
191 val userCreationRole = this._userCreationIMEvent.role // initial role
192 val userCreationIMEventID = this._userCreationIMEvent.id
194 if(!haveUserStateBootstrap) {
195 this._userStateBootstrap = UserStateBootstrap(
198 aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)),
199 aquarium.initialUserBalance(userCreationRole, userCreationMillis)
203 val now = TimeHelpers.nowMillis()
204 this._workingUserState = chargingService.replayMonthChargingUpTo(
205 BillingMonthInfo.fromMillis(now),
207 this._userStateBootstrap,
208 aquarium.currentResourceTypesMap,
209 InitialUserActorSetup(),
210 aquarium.userStateStore.insertUserState,
214 // Final touch: Update agreement history in the working user state.
215 // The assumption is that all agreement changes go via IMEvents, so the
216 // state this._workingAgreementHistory is always the authoritative source.
217 if(haveWorkingUserState) {
218 this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory)
219 DEBUG("Computed %s", this._workingUserState.toJsonString)
223 private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
224 if(!this.haveAgreements) {
225 DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
229 if(!this.haveUserCreationIMEvent) {
230 DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
234 // We will also need this functionality when receiving IMEvents, so we place it in a method
235 loadWorkingUserStateAndUpdateAgreementHistory()
237 if(haveWorkingUserState) {
238 DEBUG("Initial %s", this._workingUserState.toJsonString)
243 def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
244 this._userID = event.userID
245 DEBUG("Got %s", event)
247 initializeStateOfIMEvents()
248 initializeStateOfResourceEvents(event)
252 * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
253 * When this method is called, we assume that all proper checks have been made and it
254 * is OK to proceed with the event processing.
256 def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
257 val imEvent = processEvent.imEvent
258 val hadUserCreationIMEvent = haveUserCreationIMEvent
260 if(!haveAgreements) {
261 // This is an error. Should have been initialized from somewhere ...
262 throw new AquariumInternalError("No agreements. Cannot process %s", processEvent)
265 if(this._latestIMEventID == imEvent.id) {
266 // This happens when the actor is brought to life, then immediately initialized, and then
267 // sent the first IM event. But from the initialization procedure, this IM event will have
268 // already been loaded from DB!
269 INFO("Ignoring first %s", imEvent.toDebugString)
272 //this._latestIMEventID = imEvent.id
276 updateAgreementHistoryFrom(imEvent)
277 updateLatestIMEventIDFrom(imEvent)
279 // Must also update user state if we know when in history the life of a user begins
280 if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
281 loadWorkingUserStateAndUpdateAgreementHistory()
285 def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
286 val rcEvent = event.rcEvent
288 if(!shouldProcessResourceEvents) {
289 // This means the user has not been created (at least, as far as Aquarium is concerned).
290 // So, we do not process any resource event
291 DEBUG("Not processing %s", rcEvent.toJsonString)
297 // Since the latest resource event per resource is recorded in the user state,
298 // we do not need to query the store. Just query the in-memory state.
299 // Note: This is a similar situation with the first IMEvent received right after the user
301 if(this._latestResourceEventID == rcEvent.id) {
302 INFO("Ignoring first %s", rcEvent.toDebugString)
308 val now = TimeHelpers.nowMillis()
309 val billingMonthInfo = BillingMonthInfo.fromMillis(now)
310 val currentResourcesMap = aquarium.currentResourceTypesMap
311 val calculationReason = RealtimeChargingReason(None, now)
312 val eventOccurredMillis = rcEvent.occurredMillis
314 // DEBUG("Using %s", currentResourceTypesMap.toJsonString)
315 if(rcEvent.occurredMillis >= _workingUserState.occurredMillis) {
316 chargingService.processResourceEvent(
318 this._workingUserState,
325 // Oops. Event is OUT OF SYNC
326 DEBUG("OUT OF SYNC %s", rcEvent.toDebugString)
327 this._workingUserState = chargingService.replayMonthChargingUpTo(
329 // Take into account that the event may be out-of-sync.
330 // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
331 now max eventOccurredMillis,
332 this._userStateBootstrap,
335 stdUserStateStoreFunc,
340 DEBUG("Updated %s", this._workingUserState)
344 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
345 val userID = event.userID
347 (haveUserCreationIMEvent, haveWorkingUserState) match {
349 // (User CREATEd, with balance state)
350 sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits)))
353 // (User CREATEd, no balance state)
354 // Return the default initial balance
355 sender ! GetUserBalanceResponse(
357 GetUserBalanceResponseData(
359 aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis)
363 // (Not CREATEd, with balance state)
364 // Clearly this is internal error
365 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
367 case (false, false) ⇒
368 // (Not CREATEd, no balance state)
369 // The user is completely unknown
370 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
374 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
375 haveWorkingUserState match {
377 sender ! GetUserStateResponse(Right(this._workingUserState))
380 sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
384 private[this] def D_userID = {
388 private[this] def DEBUG(fmt: String, args: Any*) =
389 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
391 private[this] def INFO(fmt: String, args: Any*) =
392 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
394 private[this] def WARN(fmt: String, args: Any*) =
395 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
397 private[this] def ERROR(fmt: String, args: Any*) =
398 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
400 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
401 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)