65d14ff2c4f1eaa91edec3cace97e536fbd1c0ae
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.actor._
41
42 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
43 import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded}
44 import gr.grnet.aquarium.util.date.TimeHelpers
45 import gr.grnet.aquarium.service.event.BalanceEvent
46 import gr.grnet.aquarium.event.model.im.IMEventModel
47 import message._
48 import config.AquariumPropertiesLoaded
49 import config.InitializeUserActorState
50 import event.ProcessIMEvent
51 import event.ProcessResourceEvent
52 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
53 import gr.grnet.aquarium.{Aquarium, AquariumInternalError}
54 import gr.grnet.aquarium.computation.BillingMonthInfo
55 import gr.grnet.aquarium.charging.state.UserStateBootstrap
56 import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel}
57 import gr.grnet.aquarium.charging.reason.{InitialUserActorSetup, RealtimeChargingReason}
58 import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement}
59 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
60 import message.GetUserBalanceRequest
61 import message.GetUserBalanceResponse
62 import message.GetUserBalanceResponseData
63 import message.GetUserStateRequest
64 import message.GetUserStateResponse
65 import message.GetUserWalletRequest
66 import message.GetUserWalletResponse
67 import message.GetUserWalletResponseData
68 import scala.Left
69 import gr.grnet.aquarium.charging.state.WorkingAgreementHistory
70 import scala.Some
71 import scala.Right
72 import gr.grnet.aquarium.policy.StdUserAgreement
73 import gr.grnet.aquarium.charging.state.UserStateBootstrap
74 import gr.grnet.aquarium.charging.bill.BillEntry
75
76 /**
77  *
78  * @author Christos KK Loverdos <loverdos@gmail.com>
79  */
80
81 class UserActor extends ReflectiveRoleableActor {
82   private[this] var _userID: String = "<?>"
83   private[this] var _workingUserState: WorkingUserState = _
84   private[this] var _userCreationIMEvent: IMEventModel = _
85   private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory
86   private[this] var _latestIMEventID: String = ""
87   private[this] var _latestResourceEventID: String = ""
88   private[this] var _userStateBootstrap: UserStateBootstrap = _
89
90   def unsafeUserID = {
91     if(!haveUserID) {
92       throw new AquariumInternalError("%s not initialized")
93     }
94
95     this._userID
96   }
97
98   override def postStop() {
99     DEBUG("I am finally stopped (in postStop())")
100     aquarium.akkaService.notifyUserActorPostStop(this)
101   }
102
103   private[this] def shutmedown(): Unit = {
104     if(haveUserID) {
105       aquarium.akkaService.invalidateUserActor(this)
106     }
107   }
108
109   override protected def onThrowable(t: Throwable, message: AnyRef) = {
110     LogHelpers.logChainOfCauses(logger, t)
111     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
112
113     shutmedown()
114   }
115
116   def role = UserActorRole
117
118   private[this] def chargingService = aquarium.chargingService
119
120   private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ {
121     aquarium.userStateStore.insertUserState(userState)
122   }
123
124   @inline private[this] def haveUserID = {
125     this._userID ne null
126   }
127
128   @inline private[this] def haveUserCreationIMEvent = {
129     this._userCreationIMEvent ne null
130   }
131
132   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
133   }
134
135   @inline private[this] def haveAgreements = {
136     this._workingAgreementHistory.size > 0
137   }
138
139   @inline private[this] def haveWorkingUserState = {
140     this._workingUserState ne null
141   }
142
143   @inline private[this] def haveUserStateBootstrap = {
144     this._userStateBootstrap ne null
145   }
146
147   private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = {
148     if(imEvent.isCreateUser) {
149       if(haveUserCreationIMEvent) {
150         throw new AquariumInternalError(
151           "Got user creation event (id=%s) but I already have one (id=%s)",
152             this._userCreationIMEvent.id,
153             imEvent.id
154         )
155       }
156
157       this._userCreationIMEvent = imEvent
158     }
159
160     val effectiveFromMillis = imEvent.occurredMillis
161     val role = imEvent.role
162     // calling unsafe just for the side-effect
163     assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis))
164
165     val newAgreement = StdUserAgreement(
166       imEvent.id,
167       Some(imEvent.id),
168       effectiveFromMillis,
169       Long.MaxValue,
170       role,
171       PolicyDefinedFullPriceTableRef
172     )
173
174     this._workingAgreementHistory += newAgreement
175   }
176
177   private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = {
178     this._latestIMEventID = imEvent.id
179   }
180
181   private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventModel): Unit = {
182     this._latestResourceEventID = rcEvent.id
183   }
184
185   /**
186    * Creates the initial state that is related to IMEvents.
187    */
188   private[this] def initializeStateOfIMEvents(): Unit = {
189     // NOTE: this._userID is already set up by onInitializeUserActorState()
190     aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
191       DEBUG("Replaying %s", imEvent)
192
193       updateAgreementHistoryFrom(imEvent)
194       updateLatestIMEventIDFrom(imEvent)
195     }
196
197     if(haveAgreements) {
198       DEBUG("Initial agreement history %s", this._workingAgreementHistory.toJsonString)
199       logSeparator()
200     }
201   }
202
203   /**
204    * Resource events are processed only if the user has been created and has agreements.
205    * Otherwise nothing can be computed.
206    */
207   private[this] def shouldProcessResourceEvents: Boolean = {
208     haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
209   }
210
211   private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = {
212     assert(this.haveAgreements, "this.haveAgreements")
213     assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
214
215     val userCreationMillis = this._userCreationIMEvent.occurredMillis
216     val userCreationRole = this._userCreationIMEvent.role // initial role
217     val userCreationIMEventID = this._userCreationIMEvent.id
218
219     if(!haveUserStateBootstrap) {
220       this._userStateBootstrap = UserStateBootstrap(
221         this._userID,
222         userCreationMillis,
223         aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)),
224         aquarium.initialUserBalance(userCreationRole, userCreationMillis)
225       )
226     }
227
228     val now = TimeHelpers.nowMillis()
229     this._workingUserState = chargingService.replayMonthChargingUpTo(
230       BillingMonthInfo.fromMillis(now),
231       now,
232       this._userStateBootstrap,
233       aquarium.currentResourceTypesMap,
234       InitialUserActorSetup(),
235       aquarium.userStateStore.insertUserState,
236       None
237     )
238
239     // Final touch: Update agreement history in the working user state.
240     // The assumption is that all agreement changes go via IMEvents, so the
241     // state this._workingAgreementHistory is always the authoritative source.
242     if(haveWorkingUserState) {
243       this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory)
244       DEBUG("Computed working user state %s", this._workingUserState.toJsonString)
245     }
246   }
247
248   private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
249     if(!this.haveAgreements) {
250       DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
251       return
252     }
253
254     if(!this.haveUserCreationIMEvent) {
255       DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
256       return
257     }
258
259     // We will also need this functionality when receiving IMEvents, so we place it in a method
260     loadWorkingUserStateAndUpdateAgreementHistory()
261
262     if(haveWorkingUserState) {
263       DEBUG("Initial working user state %s", this._workingUserState.toJsonString)
264       logSeparator()
265     }
266   }
267
268   def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
269     this._userID = event.userID
270     DEBUG("Got %s", event)
271
272     initializeStateOfIMEvents()
273     initializeStateOfResourceEvents(event)
274   }
275
276   /**
277    * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
278    * When this method is called, we assume that all proper checks have been made and it
279    * is OK to proceed with the event processing.
280    */
281   def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
282     val imEvent = processEvent.imEvent
283     val hadUserCreationIMEvent = haveUserCreationIMEvent
284
285     if(!haveAgreements) {
286       // This IMEvent has arrived after any ResourceEvents
287       INFO("Arrived after any ResourceEvent: %s", imEvent.toDebugString)
288       initializeStateOfIMEvents()
289     }
290     else {
291       if(this._latestIMEventID == imEvent.id) {
292         // This happens when the actor is brought to life, then immediately initialized, and then
293         // sent the first IM event. But from the initialization procedure, this IM event will have
294         // already been loaded from DB!
295         INFO("Ignoring first %s", imEvent.toDebugString)
296         logSeparator()
297
298         //this._latestIMEventID = imEvent.id
299         return
300       }
301       if(imEvent.isAddCredits)  {
302         if(!hadUserCreationIMEvent && haveUserCreationIMEvent)
303         loadWorkingUserStateAndUpdateAgreementHistory()
304         onHandleAddCreditsEvent(imEvent)
305
306       } else {
307       updateAgreementHistoryFrom(imEvent)
308       updateLatestIMEventIDFrom(imEvent)
309       }
310     }
311
312     // Must also update user state if we know when in history the life of a user begins
313     if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
314       INFO("Processing user state, since we had a CREATE IMEvent")
315       loadWorkingUserStateAndUpdateAgreementHistory()
316     }
317
318     logSeparator()
319   }
320
321   /* Convert astakos message for adding credits
322     to a regular RESOURCE message */
323   def onHandleAddCreditsEvent(imEvent : IMEventModel) = {
324     val credits = imEvent.details(IMEventModel.DetailsNames.credits).toInt.toDouble
325     val event = new StdResourceEvent(
326       imEvent.id,
327       imEvent.occurredMillis,
328       imEvent.receivedMillis,
329       imEvent.userID,
330       imEvent.clientID,
331       imEvent.eventType,
332       imEvent.eventType,
333       credits,
334       imEvent.eventVersion,
335       imEvent.details
336     )
337     //Console.err.println("Event: " + event)
338     //Console.err.println("Total credits before: " + _workingUserState.totalCredits)
339     onProcessResourceEvent(new ProcessResourceEvent(event))
340     //Console.err.println("Total credits after: " + _workingUserState.totalCredits)
341     //Console.err.println("OK.")
342   }
343
344   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
345     val rcEvent = event.rcEvent
346
347     if(!shouldProcessResourceEvents) {
348       // This means the user has not been created (at least, as far as Aquarium is concerned).
349       // So, we do not process any resource event
350       DEBUG("Not processing %s", rcEvent.toJsonString)
351       logSeparator()
352
353       return
354     }
355
356     // Since the latest resource event per resource is recorded in the user state,
357     // we do not need to query the store. Just query the in-memory state.
358     // Note: This is a similar situation with the first IMEvent received right after the user
359     //       actor is created.
360     if(this._latestResourceEventID == rcEvent.id) {
361       INFO("Ignoring first %s", rcEvent.toDebugString)
362       logSeparator()
363
364       return
365     }
366
367     val now = TimeHelpers.nowMillis()
368     val currentResourcesMap = aquarium.currentResourceTypesMap
369     val chargingReason = RealtimeChargingReason(None, now)
370
371     val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
372     val nowYear = nowBillingMonthInfo.year
373     val nowMonth = nowBillingMonthInfo.month
374
375     val eventOccurredMillis = rcEvent.occurredMillis
376     val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
377     val eventYear = eventBillingMonthInfo.year
378     val eventMonth = eventBillingMonthInfo.month
379
380     def computeBatch(): Unit = {
381       DEBUG("Going for out of sync charging")
382       this._workingUserState = chargingService.replayMonthChargingUpTo(
383         nowBillingMonthInfo,
384         // Take into account that the event may be out-of-sync.
385         // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
386         now max eventOccurredMillis,
387         this._userStateBootstrap,
388         currentResourcesMap,
389         chargingReason,
390         stdUserStateStoreFunc,
391         None
392       )
393
394       updateLatestResourceEventIDFrom(rcEvent)
395     }
396
397     def computeRealtime(): Unit = {
398       DEBUG("Going for in sync charging")
399       chargingService.processResourceEvent(
400         rcEvent,
401         this._workingUserState,
402         chargingReason,
403         nowBillingMonthInfo,
404         None,
405         true
406       )
407
408       updateLatestResourceEventIDFrom(rcEvent)
409     }
410
411     var oldTotalCredits = this._workingUserState.totalCredits
412     // FIXME check these
413     if(nowYear != eventYear || nowMonth != eventMonth) {
414       DEBUG(
415         "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
416         nowYear, eventYear,
417         nowMonth, eventMonth
418       )
419       computeBatch()
420     }
421     else if(this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis) {
422       DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
423       DEBUG(
424         "%s < %s",
425         TimeHelpers.toYYYYMMDDHHMMSSSSS(this._workingUserState.latestResourceEventOccurredMillis),
426         TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.occurredMillis)
427       )
428       computeRealtime()
429     }
430     else {
431       computeBatch()
432     }
433     var newTotal = this._workingUserState.totalCredits
434     if(oldTotalCredits * newTotal < 0)
435       aquarium.eventBus ! new BalanceEvent(this._workingUserState.userID,
436                                            newTotal>=0)
437     DEBUG("Updated %s", this._workingUserState)
438     logSeparator()
439   }
440
441   def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
442     try{
443       val timeslot = event.timeslot
444       val state= if(haveWorkingUserState) Some(this._workingUserState) else None
445       val billEntry = BillEntry.fromWorkingUserState(timeslot,this._userID,state)
446       val billData = GetUserBillResponseData(this._userID,billEntry)
447       sender ! GetUserBillResponse(Right(billData))
448     } catch {
449       case e:Exception =>
450        e.printStackTrace()
451        sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
452     }
453   }
454
455   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
456     val userID = event.userID
457
458     (haveUserCreationIMEvent, haveWorkingUserState) match {
459       case (true, true) ⇒
460         // (User CREATEd, with balance state)
461         sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits)))
462
463       case (true, false) ⇒
464         // (User CREATEd, no balance state)
465         // Return the default initial balance
466         sender ! GetUserBalanceResponse(
467           Right(
468             GetUserBalanceResponseData(
469               this._userID,
470               aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis)
471         )))
472
473       case (false, true) ⇒
474         // (Not CREATEd, with balance state)
475         // Clearly this is internal error
476         sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
477
478       case (false, false) ⇒
479         // (Not CREATEd, no balance state)
480         // The user is completely unknown
481         sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
482     }
483   }
484
485   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
486     haveWorkingUserState match {
487       case true ⇒
488         sender ! GetUserStateResponse(Right(this._workingUserState))
489
490       case false ⇒
491         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
492     }
493   }
494
495   def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
496     haveWorkingUserState match {
497       case true ⇒
498         DEBUG("haveWorkingUserState: %s", event)
499         sender ! GetUserWalletResponse(
500           Right(
501             GetUserWalletResponseData(
502               this._userID,
503               this._workingUserState.totalCredits,
504               this._workingUserState.walletEntries.toList
505         )))
506
507       case false ⇒
508         DEBUG("!haveWorkingUserState: %s", event)
509         haveUserCreationIMEvent match {
510           case true ⇒
511             DEBUG("haveUserCreationIMEvent: %s", event)
512             sender ! GetUserWalletResponse(
513               Right(
514                 GetUserWalletResponseData(
515                   this._userID,
516                   aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis),
517                   Nil
518             )))
519
520           case false ⇒
521             DEBUG("!haveUserCreationIMEvent: %s", event)
522             sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
523         }
524     }
525   }
526
527   private[this] def D_userID = {
528     this._userID
529   }
530
531   private[this] def DEBUG(fmt: String, args: Any*) =
532     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
533
534   private[this] def INFO(fmt: String, args: Any*) =
535     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
536
537   private[this] def WARN(fmt: String, args: Any*) =
538     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
539
540   private[this] def ERROR(fmt: String, args: Any*) =
541     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
542
543   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
544     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
545 }