Wallets go REST
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.actor._
41
42 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
43 import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded}
44 import gr.grnet.aquarium.util.date.TimeHelpers
45 import gr.grnet.aquarium.event.model.im.IMEventModel
46 import gr.grnet.aquarium.actor.message.{GetUserWalletResponseData, GetUserWalletResponse, GetUserWalletRequest, GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
47 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
48 import gr.grnet.aquarium.AquariumInternalError
49 import gr.grnet.aquarium.computation.BillingMonthInfo
50 import gr.grnet.aquarium.charging.state.UserStateBootstrap
51 import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel}
52 import gr.grnet.aquarium.charging.reason.{InitialUserActorSetup, RealtimeChargingReason}
53 import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement}
54 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
55
56 /**
57  *
58  * @author Christos KK Loverdos <loverdos@gmail.com>
59  */
60
61 class UserActor extends ReflectiveRoleableActor {
62   private[this] var _userID: String = "<?>"
63   private[this] var _workingUserState: WorkingUserState = _
64   private[this] var _userCreationIMEvent: IMEventModel = _
65   private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory
66   private[this] var _latestIMEventID: String = ""
67   private[this] var _latestResourceEventID: String = ""
68   private[this] var _userStateBootstrap: UserStateBootstrap = _
69
70   def unsafeUserID = {
71     if(!haveUserID) {
72       throw new AquariumInternalError("%s not initialized")
73     }
74
75     this._userID
76   }
77
78   override def postStop() {
79     DEBUG("I am finally stopped (in postStop())")
80     aquarium.akkaService.notifyUserActorPostStop(this)
81   }
82
83   private[this] def shutmedown(): Unit = {
84     if(haveUserID) {
85       aquarium.akkaService.invalidateUserActor(this)
86     }
87   }
88
89   override protected def onThrowable(t: Throwable, message: AnyRef) = {
90     LogHelpers.logChainOfCauses(logger, t)
91     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
92
93     shutmedown()
94   }
95
96   def role = UserActorRole
97
98   private[this] def chargingService = aquarium.chargingService
99
100   private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ {
101     aquarium.userStateStore.insertUserState(userState)
102   }
103
104   @inline private[this] def haveUserID = {
105     this._userID ne null
106   }
107
108   @inline private[this] def haveUserCreationIMEvent = {
109     this._userCreationIMEvent ne null
110   }
111
112   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
113   }
114
115   @inline private[this] def haveAgreements = {
116     this._workingAgreementHistory.size > 0
117   }
118
119   @inline private[this] def haveWorkingUserState = {
120     this._workingUserState ne null
121   }
122
123   @inline private[this] def haveUserStateBootstrap = {
124     this._userStateBootstrap ne null
125   }
126
127   private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = {
128     if(imEvent.isCreateUser) {
129       if(haveUserCreationIMEvent) {
130         throw new AquariumInternalError(
131           "Got user creation event (id=%s) but I already have one (id=%s)",
132             this._userCreationIMEvent.id,
133             imEvent.id
134         )
135       }
136
137       this._userCreationIMEvent = imEvent
138     }
139
140     val effectiveFromMillis = imEvent.occurredMillis
141     val role = imEvent.role
142     // calling unsafe just for the side-effect
143     assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis))
144
145     val newAgreement = StdUserAgreement(
146       imEvent.id,
147       Some(imEvent.id),
148       effectiveFromMillis,
149       Long.MaxValue,
150       role,
151       PolicyDefinedFullPriceTableRef
152     )
153
154     this._workingAgreementHistory += newAgreement
155   }
156
157   private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = {
158     this._latestIMEventID = imEvent.id
159   }
160
161   private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventModel): Unit = {
162     this._latestResourceEventID = rcEvent.id
163   }
164
165   /**
166    * Creates the initial state that is related to IMEvents.
167    */
168   private[this] def initializeStateOfIMEvents(): Unit = {
169     // NOTE: this._userID is already set up by onInitializeUserActorState()
170     aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
171       DEBUG("Replaying %s", imEvent)
172
173       updateAgreementHistoryFrom(imEvent)
174       updateLatestIMEventIDFrom(imEvent)
175     }
176
177     if(haveAgreements) {
178       DEBUG("Initial agreement history %s", this._workingAgreementHistory.toJsonString)
179       logSeparator()
180     }
181   }
182
183   /**
184    * Resource events are processed only if the user has been created and has agreements.
185    * Otherwise nothing can be computed.
186    */
187   private[this] def shouldProcessResourceEvents: Boolean = {
188     haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
189   }
190
191   private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = {
192     assert(this.haveAgreements, "this.haveAgreements")
193     assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
194
195     val userCreationMillis = this._userCreationIMEvent.occurredMillis
196     val userCreationRole = this._userCreationIMEvent.role // initial role
197     val userCreationIMEventID = this._userCreationIMEvent.id
198
199     if(!haveUserStateBootstrap) {
200       this._userStateBootstrap = UserStateBootstrap(
201         this._userID,
202         userCreationMillis,
203         aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)),
204         aquarium.initialUserBalance(userCreationRole, userCreationMillis)
205       )
206     }
207
208     val now = TimeHelpers.nowMillis()
209     this._workingUserState = chargingService.replayMonthChargingUpTo(
210       BillingMonthInfo.fromMillis(now),
211       now,
212       this._userStateBootstrap,
213       aquarium.currentResourceTypesMap,
214       InitialUserActorSetup(),
215       aquarium.userStateStore.insertUserState,
216       None
217     )
218
219     // Final touch: Update agreement history in the working user state.
220     // The assumption is that all agreement changes go via IMEvents, so the
221     // state this._workingAgreementHistory is always the authoritative source.
222     if(haveWorkingUserState) {
223       this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory)
224       DEBUG("Computed working user state %s", this._workingUserState.toJsonString)
225     }
226   }
227
228   private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
229     if(!this.haveAgreements) {
230       DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
231       return
232     }
233
234     if(!this.haveUserCreationIMEvent) {
235       DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
236       return
237     }
238
239     // We will also need this functionality when receiving IMEvents, so we place it in a method
240     loadWorkingUserStateAndUpdateAgreementHistory()
241
242     if(haveWorkingUserState) {
243       DEBUG("Initial working user state %s", this._workingUserState.toJsonString)
244       logSeparator()
245     }
246   }
247
248   def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
249     this._userID = event.userID
250     DEBUG("Got %s", event)
251
252     initializeStateOfIMEvents()
253     initializeStateOfResourceEvents(event)
254   }
255
256   /**
257    * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
258    * When this method is called, we assume that all proper checks have been made and it
259    * is OK to proceed with the event processing.
260    */
261   def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
262     val imEvent = processEvent.imEvent
263     val hadUserCreationIMEvent = haveUserCreationIMEvent
264
265     if(!haveAgreements) {
266       // This is an error. Should have been initialized from somewhere ...
267       throw new AquariumInternalError("No agreements. Cannot process %s", processEvent)
268     }
269
270     if(this._latestIMEventID == imEvent.id) {
271       // This happens when the actor is brought to life, then immediately initialized, and then
272       // sent the first IM event. But from the initialization procedure, this IM event will have
273       // already been loaded from DB!
274       INFO("Ignoring first %s", imEvent.toDebugString)
275       logSeparator()
276
277       //this._latestIMEventID = imEvent.id
278       return
279     }
280
281     updateAgreementHistoryFrom(imEvent)
282     updateLatestIMEventIDFrom(imEvent)
283
284     // Must also update user state if we know when in history the life of a user begins
285     if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
286       loadWorkingUserStateAndUpdateAgreementHistory()
287     }
288   }
289
290   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
291     val rcEvent = event.rcEvent
292
293     if(!shouldProcessResourceEvents) {
294       // This means the user has not been created (at least, as far as Aquarium is concerned).
295       // So, we do not process any resource event
296       DEBUG("Not processing %s", rcEvent.toJsonString)
297       logSeparator()
298
299       return
300     }
301
302     // Since the latest resource event per resource is recorded in the user state,
303     // we do not need to query the store. Just query the in-memory state.
304     // Note: This is a similar situation with the first IMEvent received right after the user
305     //       actor is created.
306     if(this._latestResourceEventID == rcEvent.id) {
307       INFO("Ignoring first %s", rcEvent.toDebugString)
308       logSeparator()
309
310       return
311     }
312
313     val now = TimeHelpers.nowMillis()
314     val currentResourcesMap = aquarium.currentResourceTypesMap
315     val chargingReason = RealtimeChargingReason(None, now)
316
317     val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
318     val nowYear = nowBillingMonthInfo.year
319     val nowMonth = nowBillingMonthInfo.month
320
321     val eventOccurredMillis = rcEvent.occurredMillis
322     val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
323     val eventYear = eventBillingMonthInfo.year
324     val eventMonth = eventBillingMonthInfo.month
325
326     def computeBatch(): Unit = {
327       DEBUG("Going for out of sync charging")
328       this._workingUserState = chargingService.replayMonthChargingUpTo(
329         nowBillingMonthInfo,
330         // Take into account that the event may be out-of-sync.
331         // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
332         now max eventOccurredMillis,
333         this._userStateBootstrap,
334         currentResourcesMap,
335         chargingReason,
336         stdUserStateStoreFunc,
337         None
338       )
339
340       updateLatestResourceEventIDFrom(rcEvent)
341     }
342
343     def computeRealtime(): Unit = {
344       DEBUG("Going for in sync charging")
345       chargingService.processResourceEvent(
346         rcEvent,
347         this._workingUserState,
348         chargingReason,
349         nowBillingMonthInfo,
350         None,
351         true
352       )
353
354       updateLatestResourceEventIDFrom(rcEvent)
355     }
356
357     // FIXME check these
358     if(nowYear != eventYear || nowMonth != eventMonth) {
359       DEBUG(
360         "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
361         nowYear, eventYear,
362         nowMonth, eventMonth
363       )
364       computeBatch()
365     }
366     else if(this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis) {
367       DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
368       DEBUG(
369         "%s < %s",
370         TimeHelpers.toYYYYMMDDHHMMSSSSS(this._workingUserState.latestResourceEventOccurredMillis),
371         TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.occurredMillis)
372       )
373       computeRealtime()
374     }
375     else {
376       computeBatch()
377     }
378
379     DEBUG("Updated %s", this._workingUserState)
380     logSeparator()
381   }
382
383   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
384     val userID = event.userID
385
386     (haveUserCreationIMEvent, haveWorkingUserState) match {
387       case (true, true) ⇒
388         // (User CREATEd, with balance state)
389         sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits)))
390
391       case (true, false) ⇒
392         // (User CREATEd, no balance state)
393         // Return the default initial balance
394         sender ! GetUserBalanceResponse(
395           Right(
396             GetUserBalanceResponseData(
397               this._userID,
398               aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis)
399         )))
400
401       case (false, true) ⇒
402         // (Not CREATEd, with balance state)
403         // Clearly this is internal error
404         sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
405
406       case (false, false) ⇒
407         // (Not CREATEd, no balance state)
408         // The user is completely unknown
409         sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
410     }
411   }
412
413   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
414     haveWorkingUserState match {
415       case true ⇒
416         sender ! GetUserStateResponse(Right(this._workingUserState))
417
418       case false ⇒
419         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
420     }
421   }
422
423   def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
424     haveWorkingUserState match {
425       case true ⇒
426         DEBUG("haveWorkingUserState: %s", event)
427         sender ! GetUserWalletResponse(
428           Right(
429             GetUserWalletResponseData(
430               this._userID,
431               this._workingUserState.totalCredits,
432               this._workingUserState.walletEntries.toList
433         )))
434
435       case false ⇒
436         DEBUG("!haveWorkingUserState: %s", event)
437         haveUserCreationIMEvent match {
438           case true ⇒
439             DEBUG("haveUserCreationIMEvent: %s", event)
440             sender ! GetUserWalletResponse(
441               Right(
442                 GetUserWalletResponseData(
443                   this._userID,
444                   aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis),
445                   Nil
446             )))
447
448           case false ⇒
449             DEBUG("!haveUserCreationIMEvent: %s", event)
450             sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
451         }
452     }
453   }
454
455   private[this] def D_userID = {
456     this._userID
457   }
458
459   private[this] def DEBUG(fmt: String, args: Any*) =
460     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
461
462   private[this] def INFO(fmt: String, args: Any*) =
463     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
464
465   private[this] def WARN(fmt: String, args: Any*) =
466     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
467
468   private[this] def ERROR(fmt: String, args: Any*) =
469     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
470
471   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
472     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
473 }