Applying the new policy semantics everywhere
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.actor._
41
42 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
43 import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded}
44 import gr.grnet.aquarium.util.date.TimeHelpers
45 import gr.grnet.aquarium.event.model.im.IMEventModel
46 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
47 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
48 import gr.grnet.aquarium.AquariumInternalError
49 import gr.grnet.aquarium.computation.BillingMonthInfo
50 import gr.grnet.aquarium.computation.state.UserStateBootstrap
51 import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel}
52 import gr.grnet.aquarium.charging.reason.{InitialUserActorSetup, RealtimeChargingReason}
53 import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement}
54
55 /**
56  *
57  * @author Christos KK Loverdos <loverdos@gmail.com>
58  */
59
60 class UserActor extends ReflectiveRoleableActor {
61   private[this] var _userID: String = "<?>"
62   private[this] var _workingUserState: WorkingUserState = _
63   private[this] var _userCreationIMEvent: IMEventModel = _
64   private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory
65   private[this] var _latestIMEventID: String = ""
66   private[this] var _latestResourceEventID: String = ""
67   private[this] var _userStateBootstrap: UserStateBootstrap = _
68
69   def unsafeUserID = {
70     if(!haveUserID) {
71       throw new AquariumInternalError("%s not initialized")
72     }
73
74     this._userID
75   }
76
77   override def postStop() {
78     DEBUG("I am finally stopped (in postStop())")
79     aquarium.akkaService.notifyUserActorPostStop(this)
80   }
81
82   private[this] def shutmedown(): Unit = {
83     if(haveUserID) {
84       aquarium.akkaService.invalidateUserActor(this)
85     }
86   }
87
88   override protected def onThrowable(t: Throwable, message: AnyRef) = {
89     LogHelpers.logChainOfCauses(logger, t)
90     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
91
92     shutmedown()
93   }
94
95   def role = UserActorRole
96
97   private[this] def chargingService = aquarium.chargingService
98
99   private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ {
100     aquarium.userStateStore.insertUserState(userState)
101   }
102
103   @inline private[this] def haveUserID = {
104     this._userID ne null
105   }
106
107   @inline private[this] def haveUserCreationIMEvent = {
108     this._userCreationIMEvent ne null
109   }
110
111   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
112   }
113
114   @inline private[this] def haveAgreements = {
115     this._workingAgreementHistory.size > 0
116   }
117
118   @inline private[this] def haveWorkingUserState = {
119     this._workingUserState ne null
120   }
121
122   @inline private[this] def haveUserStateBootstrap = {
123     this._userStateBootstrap ne null
124   }
125
126   private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = {
127     if(imEvent.isCreateUser) {
128       if(haveUserCreationIMEvent) {
129         throw new AquariumInternalError(
130           "Got user creation event (id=%s) but I already have one (id=%s)",
131             this._userCreationIMEvent.id,
132             imEvent.id
133         )
134       }
135
136       this._userCreationIMEvent = imEvent
137     }
138
139     val effectiveFromMillis = imEvent.occurredMillis
140     val role = imEvent.role
141     // calling unsafe just for the side-effect
142     assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis))
143
144     val newAgreement = StdUserAgreement(
145       imEvent.id,
146       Some(imEvent.id),
147       effectiveFromMillis,
148       Long.MaxValue,
149       role,
150       PolicyDefinedFullPriceTableRef
151     )
152
153     this._workingAgreementHistory += newAgreement
154   }
155
156   private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = {
157     this._latestIMEventID = imEvent.id
158   }
159
160   /**
161    * Creates the initial state that is related to IMEvents.
162    */
163   private[this] def initializeStateOfIMEvents(): Unit = {
164     // NOTE: this._userID is already set up by onInitializeUserActorState()
165     aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
166       DEBUG("Replaying %s", imEvent)
167
168       updateAgreementHistoryFrom(imEvent)
169       updateLatestIMEventIDFrom(imEvent)
170     }
171
172     if(haveAgreements) {
173       DEBUG("Initial %s", this._workingAgreementHistory.toJsonString)
174       logSeparator()
175     }
176   }
177
178   /**
179    * Resource events are processed only if the user has been created and has agreements.
180    * Otherwise nothing can be computed.
181    */
182   private[this] def shouldProcessResourceEvents: Boolean = {
183     haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
184   }
185
186   private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = {
187     assert(this.haveAgreements, "this.haveAgreements")
188     assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
189
190     val userCreationMillis = this._userCreationIMEvent.occurredMillis
191     val userCreationRole = this._userCreationIMEvent.role // initial role
192     val userCreationIMEventID = this._userCreationIMEvent.id
193
194     if(!haveUserStateBootstrap) {
195       this._userStateBootstrap = UserStateBootstrap(
196         this._userID,
197         userCreationMillis,
198         aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)),
199         aquarium.initialUserBalance(userCreationRole, userCreationMillis)
200       )
201     }
202
203     val now = TimeHelpers.nowMillis()
204     this._workingUserState = chargingService.replayMonthChargingUpTo(
205       BillingMonthInfo.fromMillis(now),
206       now,
207       this._userStateBootstrap,
208       aquarium.currentResourceTypesMap,
209       InitialUserActorSetup(),
210       aquarium.userStateStore.insertUserState,
211       None
212     )
213
214     // Final touch: Update agreement history in the working user state.
215     // The assumption is that all agreement changes go via IMEvents, so the
216     // state this._workingAgreementHistory is always the authoritative source.
217     if(haveWorkingUserState) {
218       this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory)
219       DEBUG("Computed %s", this._workingUserState.toJsonString)
220     }
221   }
222
223   private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
224     if(!this.haveAgreements) {
225       DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
226       return
227     }
228
229     if(!this.haveUserCreationIMEvent) {
230       DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
231       return
232     }
233
234     // We will also need this functionality when receiving IMEvents, so we place it in a method
235     loadWorkingUserStateAndUpdateAgreementHistory()
236
237     if(haveWorkingUserState) {
238       DEBUG("Initial %s", this._workingUserState.toJsonString)
239       logSeparator()
240     }
241   }
242
243   def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
244     this._userID = event.userID
245     DEBUG("Got %s", event)
246
247     initializeStateOfIMEvents()
248     initializeStateOfResourceEvents(event)
249   }
250
251   /**
252    * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
253    * When this method is called, we assume that all proper checks have been made and it
254    * is OK to proceed with the event processing.
255    */
256   def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
257     val imEvent = processEvent.imEvent
258     val hadUserCreationIMEvent = haveUserCreationIMEvent
259
260     if(!haveAgreements) {
261       // This is an error. Should have been initialized from somewhere ...
262       throw new AquariumInternalError("No agreements. Cannot process %s", processEvent)
263     }
264
265     if(this._latestIMEventID == imEvent.id) {
266       // This happens when the actor is brought to life, then immediately initialized, and then
267       // sent the first IM event. But from the initialization procedure, this IM event will have
268       // already been loaded from DB!
269       INFO("Ignoring first %s", imEvent.toDebugString)
270       logSeparator()
271
272       //this._latestIMEventID = imEvent.id
273       return
274     }
275
276     updateAgreementHistoryFrom(imEvent)
277     updateLatestIMEventIDFrom(imEvent)
278
279     // Must also update user state if we know when in history the life of a user begins
280     if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
281       loadWorkingUserStateAndUpdateAgreementHistory()
282     }
283   }
284
285   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
286     val rcEvent = event.rcEvent
287
288     if(!shouldProcessResourceEvents) {
289       // This means the user has not been created (at least, as far as Aquarium is concerned).
290       // So, we do not process any resource event
291       DEBUG("Not processing %s", rcEvent.toJsonString)
292       logSeparator()
293
294       return
295     }
296
297     // Since the latest resource event per resource is recorded in the user state,
298     // we do not need to query the store. Just query the in-memory state.
299     // Note: This is a similar situation with the first IMEvent received right after the user
300     //       actor is created.
301     if(this._latestResourceEventID == rcEvent.id) {
302       INFO("Ignoring first %s", rcEvent.toDebugString)
303       logSeparator()
304
305       return
306     }
307
308     val now = TimeHelpers.nowMillis()
309     val billingMonthInfo = BillingMonthInfo.fromMillis(now)
310     val currentResourcesMap = aquarium.currentResourceTypesMap
311     val calculationReason = RealtimeChargingReason(None, now)
312     val eventOccurredMillis = rcEvent.occurredMillis
313
314 //    DEBUG("Using %s", currentResourceTypesMap.toJsonString)
315     if(rcEvent.occurredMillis >= _workingUserState.occurredMillis) {
316       chargingService.processResourceEvent(
317         rcEvent,
318         this._workingUserState,
319         calculationReason,
320         billingMonthInfo,
321         None
322       )
323     }
324     else {
325       // Oops. Event is OUT OF SYNC
326       DEBUG("OUT OF SYNC %s", rcEvent.toDebugString)
327       this._workingUserState = chargingService.replayMonthChargingUpTo(
328         billingMonthInfo,
329         // Take into account that the event may be out-of-sync.
330         // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
331         now max eventOccurredMillis,
332         this._userStateBootstrap,
333         currentResourcesMap,
334         calculationReason,
335         stdUserStateStoreFunc,
336         None
337       )
338     }
339
340     DEBUG("Updated %s", this._workingUserState)
341     logSeparator()
342   }
343
344   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
345     val userID = event.userID
346
347     (haveUserCreationIMEvent, haveWorkingUserState) match {
348       case (true, true) ⇒
349         // (User CREATEd, with balance state)
350         sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits)))
351
352       case (true, false) ⇒
353         // (User CREATEd, no balance state)
354         // Return the default initial balance
355         sender ! GetUserBalanceResponse(
356           Right(
357             GetUserBalanceResponseData(
358               this._userID,
359               aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis)
360         )))
361
362       case (false, true) ⇒
363         // (Not CREATEd, with balance state)
364         // Clearly this is internal error
365         sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
366
367       case (false, false) ⇒
368         // (Not CREATEd, no balance state)
369         // The user is completely unknown
370         sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
371     }
372   }
373
374   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
375     haveWorkingUserState match {
376       case true ⇒
377         sender ! GetUserStateResponse(Right(this._workingUserState))
378
379       case false ⇒
380         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
381     }
382   }
383
384   private[this] def D_userID = {
385     this._userID
386   }
387
388   private[this] def DEBUG(fmt: String, args: Any*) =
389     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
390
391   private[this] def INFO(fmt: String, args: Any*) =
392     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
393
394   private[this] def WARN(fmt: String, args: Any*) =
395     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
396
397   private[this] def ERROR(fmt: String, args: Any*) =
398     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
399
400   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
401     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
402 }