Several fixes here and there => Removed all exceptions caused by Avro. Rolled back...
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.AquariumInternalError
41 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
42 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
43 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
44 import gr.grnet.aquarium.actor.message.GetUserBillRequest
45 import gr.grnet.aquarium.actor.message.GetUserBillResponse
46 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
47 import gr.grnet.aquarium.actor.message.GetUserStateRequest
48 import gr.grnet.aquarium.actor.message.GetUserStateResponse
49 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
50 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
51 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
52 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
53 import gr.grnet.aquarium.actor.message.config.InitializeUserActorState
54 import gr.grnet.aquarium.charging.bill.AbstractBillEntry
55 import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
56 import gr.grnet.aquarium.computation.BillingMonthInfo
57 import gr.grnet.aquarium.message.avro.gen.{IMEventMsg, ResourceEventMsg, UserStateMsg}
58 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers}
59 import gr.grnet.aquarium.service.event.BalanceEvent
60 import gr.grnet.aquarium.util.date.TimeHelpers
61 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
62
63 /**
64  *
65  * @author Christos KK Loverdos <loverdos@gmail.com>
66  */
67
68 class UserActor extends ReflectiveRoleableActor {
69   private[this] var _userID: String = "<?>"
70   private[this] var _userState: UserStateModel = _
71   private[this] var _userCreationIMEvent: IMEventMsg = _
72   private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _
73   private[this] var _latestIMEventOriginalID: String = ""
74   private[this] var _latestResourceEventOriginalID: String = ""
75   private[this] var _userStateBootstrap: UserStateBootstrap = _
76
77   def unsafeUserID = {
78     if(!haveUserID) {
79       throw new AquariumInternalError("%s not initialized")
80     }
81
82     this._userID
83   }
84
85   override def postStop() {
86     DEBUG("I am finally stopped (in postStop())")
87     aquarium.akkaService.notifyUserActorPostStop(this)
88   }
89
90   private[this] def shutmedown(): Unit = {
91     if(haveUserID) {
92       aquarium.akkaService.invalidateUserActor(this)
93     }
94   }
95
96   override protected def onThrowable(t: Throwable, message: AnyRef) = {
97     LogHelpers.logChainOfCauses(logger, t)
98     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
99
100     shutmedown()
101   }
102
103   def role = UserActorRole
104
105   private[this] def chargingService = aquarium.chargingService
106
107   private[this] def stdUserStateStoreFunc = (userState: UserStateMsg) ⇒ {
108     aquarium.userStateStore.insertUserState(userState)
109   }
110
111   @inline private[this] def haveUserID = {
112     this._userID ne null
113   }
114
115   @inline private[this] def haveUserCreationIMEvent = {
116     this._userCreationIMEvent ne null
117   }
118
119   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
120   }
121
122   @inline private[this] def haveAgreements = {
123     (this._userAgreementHistoryModel ne null) && this._userAgreementHistoryModel.size > 0
124   }
125
126   @inline private[this] def haveUserState = {
127     this._userState ne null
128   }
129
130   @inline private[this] def haveUserStateBootstrap = {
131     this._userStateBootstrap ne null
132   }
133
134   private[this] def updateAgreementHistoryFrom(imEvent: IMEventMsg): Unit = {
135     val isCreateUser = MessageHelpers.isIMEventCreate(imEvent)
136     if(isCreateUser) {
137       if(haveUserCreationIMEvent) {
138         throw new AquariumInternalError(
139           "Got user creation event (id=%s) but I already have one (id=%s)",
140             this._userCreationIMEvent.getOriginalID,
141             imEvent.getOriginalID
142         )
143       }
144
145       this._userCreationIMEvent = imEvent
146     }
147
148     val effectiveFromMillis = imEvent.getOccurredMillis
149     val role = imEvent.getRole
150     // calling unsafe just for the side-effect
151     assert(null ne aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis))
152
153     // add to model (will update the underlying messages as well)
154     if(this._userAgreementHistoryModel eq null) {
155       this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModelFromIMEvent(imEvent, imEvent.getOriginalID)
156     } else {
157       val newUserAgreementModel = ModelFactory.newUserAgreementModelFromIMEvent(imEvent, imEvent.getOriginalID)
158       this._userAgreementHistoryModel += newUserAgreementModel
159     }
160   }
161
162   private[this] def updateLatestIMEventIDFrom(imEvent: IMEventMsg): Unit = {
163     this._latestIMEventOriginalID = imEvent.getOriginalID
164   }
165
166   private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventMsg): Unit = {
167     this._latestResourceEventOriginalID = rcEvent.getOriginalID
168   }
169
170   /**
171    * Creates the initial state that is related to IMEvents.
172    */
173   private[this] def initializeStateOfIMEvents(): Unit = {
174     // NOTE: this._userID is already set up by onInitializeUserActorState()
175     aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
176       DEBUG("Replaying %s", imEvent)
177
178       updateAgreementHistoryFrom(imEvent)
179       updateLatestIMEventIDFrom(imEvent)
180     }
181
182     if(haveAgreements) {
183       DEBUG("Initial agreement history %s", this._userAgreementHistoryModel.toJsonString)
184       logSeparator()
185     }
186   }
187
188   /**
189    * Resource events are processed only if the user has been created and has agreements.
190    * Otherwise nothing can be computed.
191    */
192   private[this] def shouldProcessResourceEvents: Boolean = {
193     haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
194   }
195
196   private[this] def loadUserStateAndUpdateAgreementHistory(): Unit = {
197     assert(this.haveAgreements, "this.haveAgreements")
198     assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
199
200     if(!haveUserStateBootstrap) {
201       this._userStateBootstrap = aquarium.getUserStateBootstrap(this._userCreationIMEvent)
202     }
203     logger.debug("#### this._userStateBootStrap %s".format(this._userStateBootstrap.toString))
204     val now = TimeHelpers.nowMillis()
205     this._userState = chargingService.replayMonthChargingUpTo(
206       BillingMonthInfo.fromMillis(now),
207       now,
208       this._userStateBootstrap,
209       aquarium.currentResourceTypesMap,
210       aquarium.userStateStore.insertUserState
211     )
212
213     // Final touch: Update agreement history in the working user state.
214     // The assumption is that all agreement changes go via IMEvents, so the
215     // state this._workingAgreementHistory is always the authoritative source.
216     if(haveUserState) {
217       this._userState.userAgreementHistoryModel = this._userAgreementHistoryModel
218       DEBUG("Computed working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg))
219     }
220   }
221
222   private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
223     if(!this.haveAgreements) {
224       DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
225       return
226     }
227
228     if(!this.haveUserCreationIMEvent) {
229       DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
230       return
231     }
232
233     // We will also need this functionality when receiving IMEvents, so we place it in a method
234     loadUserStateAndUpdateAgreementHistory()
235
236     if(haveUserState) {
237       DEBUG("Initial working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg))
238       logSeparator()
239     }
240   }
241
242   def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
243     this._userID = event.userID
244     DEBUG("Got %s", event)
245
246     initializeStateOfIMEvents()
247     initializeStateOfResourceEvents(event)
248   }
249
250   /**
251    * Process [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s.
252    * When this method is called, we assume that all proper checks have been made and it
253    * is OK to proceed with the event processing.
254    */
255   def onIMEventMsg(imEvent: IMEventMsg): Unit = {
256     val hadUserCreationIMEvent = haveUserCreationIMEvent
257
258     if(!haveAgreements) {
259       // This IMEvent has arrived after any ResourceEvents
260       INFO("Arrived after any ResourceEvent: %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent))
261       initializeStateOfIMEvents()
262     }
263     else {
264       if(this._latestIMEventOriginalID == imEvent.getOriginalID) {
265         // This happens when the actor is brought to life, then immediately initialized, and then
266         // sent the first IM event. But from the initialization procedure, this IM event will have
267         // already been loaded from DB!
268         INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent))
269         logSeparator()
270
271         //this._latestIMEventID = imEvent.id
272         return
273       }
274
275       updateAgreementHistoryFrom(imEvent)
276       updateLatestIMEventIDFrom(imEvent)
277     }
278
279     // Must also update user state if we know when in history the life of a user begins
280     if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
281       INFO("Processing user state, since we had a CREATE IMEvent")
282       loadUserStateAndUpdateAgreementHistory()
283     }
284
285     logSeparator()
286   }
287
288   def onResourceEventMsg(rcEvent: ResourceEventMsg): Unit = {
289     if(!shouldProcessResourceEvents) {
290       // This means the user has not been created (at least, as far as Aquarium is concerned).
291       // So, we do not process any resource event
292       DEBUG("Not processing %s", AvroHelpers.jsonStringOfSpecificRecord(rcEvent))
293       logSeparator()
294
295       return
296     }
297
298     // Since the latest resource event per resource is recorded in the user state,
299     // we do not need to query the store. Just query the in-memory state.
300     // Note: This is a similar situation with the first IMEvent received right after the user
301     //       actor is created.
302     if(this._latestResourceEventOriginalID == rcEvent.getOriginalID) {
303       INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(rcEvent))
304       logSeparator()
305
306       return
307     }
308
309     val now = TimeHelpers.nowMillis()
310     // TODO: Review this and its usage in user state.
311     // TODO: The assumption is that the resource set increases all the time,
312     // TODO: so the current map contains everything ever known (assuming we do not run backwards in time).
313     val currentResourcesMap = aquarium.currentResourceTypesMap
314
315     val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
316     val nowYear = nowBillingMonthInfo.year
317     val nowMonth = nowBillingMonthInfo.month
318
319     val eventOccurredMillis = rcEvent.getOccurredMillis
320     val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
321     val eventYear = eventBillingMonthInfo.year
322     val eventMonth = eventBillingMonthInfo.month
323
324     def computeBatch(): Unit = {
325       DEBUG("Going for out of sync charging")
326       this._userState = chargingService.replayMonthChargingUpTo(
327         nowBillingMonthInfo,
328         // Take into account that the event may be out-of-sync.
329         // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
330         now max eventOccurredMillis,
331         this._userStateBootstrap,
332         currentResourcesMap,
333         stdUserStateStoreFunc
334       )
335
336       updateLatestResourceEventIDFrom(rcEvent)
337     }
338
339     def computeRealtime(): Unit = {
340       DEBUG("Going for in sync charging")
341       chargingService.processResourceEvent(
342         rcEvent,
343         this._userState,
344         nowBillingMonthInfo,
345         true
346       )
347
348       updateLatestResourceEventIDFrom(rcEvent)
349     }
350
351     val oldTotalCredits =
352       if(this._userState!=null)
353         this._userState.totalCredits
354       else
355         0.0D
356     // FIXME check these
357     if(this._userState eq null) {
358       computeBatch()
359     }
360     else if(nowYear != eventYear || nowMonth != eventMonth) {
361       DEBUG(
362         "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
363         nowYear, eventYear,
364         nowMonth, eventMonth
365       )
366       computeBatch()
367     }
368     else if(this._userState.latestResourceEventOccurredMillis < rcEvent.getOccurredMillis) {
369       DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
370       DEBUG(
371         "%s < %s",
372         TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis),
373         TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis)
374       )
375       computeRealtime()
376     }
377     else {
378       DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s  and rcEvent.occurredMillis=%s",
379                 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis),
380                 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis))
381
382       computeBatch()
383     }
384     val newTotalCredits = this._userState.totalCredits
385     if(oldTotalCredits * newTotalCredits < 0)
386       aquarium.eventBus ! new BalanceEvent(this._userState.userID,
387                                            newTotalCredits>=0)
388     DEBUG("Updated %s", this._userState)
389     logSeparator()
390   }
391
392   def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
393     try{
394       val timeslot = event.timeslot
395       val state= if(haveUserState) Some(this._userState.msg) else None
396       val billEntry = AbstractBillEntry.fromWorkingUserState(timeslot,this._userID,state)
397       val billData = GetUserBillResponseData(this._userID,billEntry)
398       sender ! GetUserBillResponse(Right(billData))
399     } catch {
400       case e:Exception =>
401        e.printStackTrace()
402        sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
403     }
404   }
405
406   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
407     val userID = event.userID
408
409     (haveUserCreationIMEvent, haveUserState) match {
410       case (true, true) ⇒
411         // (User CREATEd, with balance state)
412         val realtimeMillis = TimeHelpers.nowMillis()
413         chargingService.calculateRealtimeUserState(
414           this._userState,
415           BillingMonthInfo.fromMillis(realtimeMillis),
416           realtimeMillis
417         )
418
419         sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._userState.totalCredits)))
420
421       case (true, false) ⇒
422         // (User CREATEd, no balance state)
423         // Return the default initial balance
424         sender ! GetUserBalanceResponse(
425           Right(
426             GetUserBalanceResponseData(
427               this._userID,
428               aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis)
429         )))
430
431       case (false, true) ⇒
432         // (Not CREATEd, with balance state)
433         // Clearly this is internal error
434         sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
435
436       case (false, false) ⇒
437         // (Not CREATEd, no balance state)
438         // The user is completely unknown
439         sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
440     }
441   }
442
443   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
444     haveUserState match {
445       case true ⇒
446         val realtimeMillis = TimeHelpers.nowMillis()
447         chargingService.calculateRealtimeUserState(
448           this._userState,
449           BillingMonthInfo.fromMillis(realtimeMillis),
450           realtimeMillis
451         )
452
453         sender ! GetUserStateResponse(Right(this._userState.msg))
454
455       case false ⇒
456         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
457     }
458   }
459
460   def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
461     haveUserState match {
462       case true ⇒
463         DEBUG("haveWorkingUserState: %s", event)
464         val realtimeMillis = TimeHelpers.nowMillis()
465         chargingService.calculateRealtimeUserState(
466           this._userState,
467           BillingMonthInfo.fromMillis(realtimeMillis),
468           realtimeMillis
469         )
470
471         sender ! GetUserWalletResponse(
472           Right(
473             GetUserWalletResponseData(
474               this._userID,
475               this._userState.totalCredits,
476               MessageFactory.newWalletEntriesMsg(this._userState.msg.getWalletEntries)
477         )))
478
479       case false ⇒
480         DEBUG("!haveWorkingUserState: %s", event)
481         haveUserCreationIMEvent match {
482           case true ⇒
483             DEBUG("haveUserCreationIMEvent: %s", event)
484             sender ! GetUserWalletResponse(
485               Right(
486                 GetUserWalletResponseData(
487                   this._userID,
488                   aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis),
489                   MessageFactory.newWalletEntriesMsg()
490             )))
491
492           case false ⇒
493             DEBUG("!haveUserCreationIMEvent: %s", event)
494             sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
495         }
496     }
497   }
498
499   private[this] def D_userID = {
500     this._userID
501   }
502
503   private[this] def DEBUG(fmt: String, args: Any*) =
504     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
505
506   private[this] def INFO(fmt: String, args: Any*) =
507     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
508
509   private[this] def WARN(fmt: String, args: Any*) =
510     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
511
512   private[this] def ERROR(fmt: String, args: Any*) =
513     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
514
515   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
516     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
517 }