eeb48219c017ee2e95c9ff29e384cf2660d79c0e
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.AquariumInternalError
41 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
42 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
43 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
44 import gr.grnet.aquarium.actor.message.GetUserBillRequest
45 import gr.grnet.aquarium.actor.message.GetUserBillResponse
46 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
47 import gr.grnet.aquarium.actor.message.GetUserStateRequest
48 import gr.grnet.aquarium.actor.message.GetUserStateResponse
49 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
50 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
51 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
52 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
53 import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
54 import gr.grnet.aquarium.computation.BillingMonthInfo
55 import gr.grnet.aquarium.message.avro.gen.{IMEventMsg, ResourceEventMsg, UserStateMsg}
56 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers}
57 import gr.grnet.aquarium.service.event.BalanceEvent
58 import gr.grnet.aquarium.util.date.TimeHelpers
59 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
60 import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
61 import gr.grnet.aquarium.charging.bill.BillEntryMsg
62
63 /**
64  *
65  * @author Christos KK Loverdos <loverdos@gmail.com>
66  */
67
68 class UserActor extends ReflectiveRoleableActor {
69   private[this] var _rcMsgCount = 0
70   private[this] var _imMsgCount = 0
71   private[this] var _userID: String = "<?>"
72   private[this] var _userState: UserStateModel = _
73   private[this] var _userCreationIMEvent: IMEventMsg = _
74   private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _
75   private[this] var _latestIMEventOriginalID: String = ""
76   private[this] var _latestIMEventOccurredMillis: Long = -1L
77   private[this] var _latestResourceEventOriginalID: String = ""
78   private[this] var _userStateBootstrap: UserStateBootstrap = _
79
80   def unsafeUserID = {
81     if(!haveUserID) {
82       throw new AquariumInternalError("%s not initialized")
83     }
84
85     this._userID
86   }
87
88   override def postStop() {
89     DEBUG("I am finally stopped (in postStop())")
90     aquarium.akkaService.notifyUserActorPostStop(this)
91   }
92
93   private[this] def shutmedown(): Unit = {
94     if(haveUserID) {
95       aquarium.akkaService.invalidateUserActor(this)
96     }
97   }
98
99   override protected def onThrowable(t: Throwable, message: AnyRef) = {
100     LogHelpers.logChainOfCauses(logger, t)
101     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
102
103     shutmedown()
104   }
105
106   def role = UserActorRole
107
108   private[this] def chargingService = aquarium.chargingService
109
110   private[this] def stdUserStateStoreFunc = (userState: UserStateMsg) ⇒ {
111     aquarium.userStateStore.insertUserState(userState)
112   }
113
114   @inline private[this] def haveUserID = {
115     this._userID ne null
116   }
117
118   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
119   }
120
121   @inline private[this] def haveAgreements = {
122     (this._userAgreementHistoryModel ne null) && this._userAgreementHistoryModel.size > 0
123   }
124
125   @inline private[this] def haveUserState = {
126     this._userState ne null
127   }
128
129   @inline private[this] def haveUserStateBootstrap = {
130     this._userStateBootstrap ne null
131   }
132
133   private[this] def createUserAgreementHistoryModel(imEvent: IMEventMsg) {
134     assert(MessageHelpers.isIMEventCreate(imEvent))
135     assert(this._userAgreementHistoryModel eq null)
136     assert(this._userCreationIMEvent eq null)
137
138     this._userCreationIMEvent = imEvent
139     this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModelFromIMEvent(
140       imEvent,
141       imEvent.getOriginalID
142     )
143   }
144
145   private[this] def updateAgreementHistoryFrom(imEvent: IMEventMsg): Unit = {
146     val isCreateUser = MessageHelpers.isIMEventCreate(imEvent)
147     if(isCreateUser) {
148       if(haveAgreements) {
149         throw new AquariumInternalError(
150           "Got user creation event (id=%s) but I already have one (id=%s)",
151           this._userCreationIMEvent.getOriginalID,
152           imEvent.getOriginalID
153         )
154       }
155
156       createUserAgreementHistoryModel(imEvent) // now we have an agreement history
157       createUserStateBootstrap(imEvent)
158     }
159
160     val effectiveFromMillis = imEvent.getOccurredMillis
161     val role = imEvent.getRole
162     // calling unsafe just for the side-effect
163     assert(null ne aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis))
164
165     // add to model (will update the underlying messages as well)
166     val newUserAgreementModel = ModelFactory.newUserAgreementModelFromIMEvent(imEvent, imEvent.getOriginalID)
167     this._userAgreementHistoryModel += newUserAgreementModel
168
169     // We assume that we always call this method with in-sync events
170     assert(imEvent.getOccurredMillis >= this._latestIMEventOccurredMillis)
171     updateLatestIMEventStateFrom(imEvent)
172   }
173
174 //  private[this] def updateLatestIMEventIDFrom(imEvent: IMEventMsg): Unit = {
175 //    this._latestIMEventOriginalID = imEvent.getOriginalID
176 //  }
177
178   private[this] def updateLatestIMEventStateFrom(imEvent: IMEventMsg) {
179     this._latestIMEventOriginalID = imEvent.getOriginalID
180     this._latestIMEventOccurredMillis = imEvent.getOccurredMillis
181     this._imMsgCount += 1
182   }
183
184   private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventMsg): Unit = {
185     this._latestResourceEventOriginalID = rcEvent.getOriginalID
186   }
187
188   /**
189    * Creates the initial state that is related to IMEvents.
190    *
191    * @return `true` if there was a user CREATE event
192    */
193   private[this] def initializeStateOfIMEvents(): Boolean = {
194     DEBUG("initializeStateOfIMEvents()")
195
196     // NOTE: this._userID is already set up our caller
197     var _imcounter = 0
198
199     aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
200       _imcounter += 1
201       DEBUG("Replaying [%s] %s", _imcounter, imEvent)
202
203       if(_imcounter == 1 && !MessageHelpers.isIMEventCreate(imEvent)) {
204         // The very first event must be a CREATE event. Otherwise we abort initialization.
205         // This will normally happen during devops :)
206         INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
207         false
208       }
209       else {
210         updateAgreementHistoryFrom(imEvent)
211         true
212       }
213     }
214   }
215
216   private[this] def loadUserStateAndUpdateAgreementHistory(): Unit = {
217     assert(this.haveAgreements, "this.haveAgreements")
218
219     if(!haveUserStateBootstrap) {
220       this._userStateBootstrap = aquarium.getUserStateBootstrap(this._userCreationIMEvent)
221     }
222     logger.debug("#### this._userStateBootStrap %s".format(this._userStateBootstrap.toString))
223     val now = TimeHelpers.nowMillis()
224     this._userState = chargingService.replayMonthChargingUpTo(
225       BillingMonthInfo.fromMillis(now),
226       now,
227       this._userStateBootstrap,
228       aquarium.currentResourceTypesMap,
229       aquarium.userStateStore.insertUserState
230     )
231
232     // Final touch: Update agreement history in the working user state.
233     // The assumption is that all agreement changes go via IMEvents, so the
234     // state this._workingAgreementHistory is always the authoritative source.
235     if(haveUserState) {
236       this._userState.userAgreementHistoryModel = this._userAgreementHistoryModel
237       DEBUG("Computed working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg))
238     }
239   }
240
241   private[this] def initializeStateOfResourceEvents(): Unit = {
242     DEBUG("initializeStateOfResourceEvents()")
243     assert(haveAgreements)
244
245     // We will also need this functionality when receiving IMEvents, so we place it in a method
246     loadUserStateAndUpdateAgreementHistory()
247
248     if(haveUserState) {
249       DEBUG("Initial working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg))
250       logSeparator()
251     }
252   }
253
254   /**
255    * Initializes the actor state from DB.
256    */
257   def initializeUserActorState(userID: String): Boolean = {
258     this._userID = userID
259
260     if(initializeStateOfIMEvents()) {
261       initializeStateOfResourceEvents()
262       // Even if we have no resource events, the user is at least CREATEd
263       true
264     }
265     else {
266       false
267     }
268   }
269
270   def createUserStateBootstrap(imEvent: IMEventMsg) {
271     assert(MessageHelpers.isIMEventCreate(imEvent), "MessageHelpers.isIMEventCreate(imEvent)")
272     assert(this._userCreationIMEvent == imEvent, "this._userCreationIMEvent == imEvent")
273
274     this._userStateBootstrap = aquarium.getUserStateBootstrap(this._userCreationIMEvent)
275   }
276
277   /**
278    * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
279    * messaging hub (rabbitmq).
280    */
281   def onIMEventMsg(imEvent: IMEventMsg) {
282     if(!haveAgreements) {
283       // If we have no agreements so far, then it does not matter what kind of event
284       // this is. So we replay the log (ehm.. store)
285       initializeUserActorState(imEvent.getUserID)
286
287       return
288     }
289
290     // Check for out of sync (regarding IMEvents)
291     val isOutOfSyncIM = imEvent.getOccurredMillis < this._latestIMEventOccurredMillis
292     if(isOutOfSyncIM) {
293       // clear all resource state
294       // FIXME implement
295
296       return
297     }
298
299     // Check out of sync (regarding ResourceEvents)
300     val isOutOfSyncRC = false // FIXME implement
301     if(isOutOfSyncRC) {
302       // TODO
303
304       return
305     }
306
307     // OK, seems good
308     assert(!MessageHelpers.isIMEventCreate(imEvent), "!MessageHelpers.isIMEventCreate(imEvent)")
309     updateAgreementHistoryFrom(imEvent)
310   }
311
312   def onResourceEventMsg(rcEvent: ResourceEventMsg) {
313     if(!haveAgreements) {
314       DEBUG("No agreement. Ignoring %s", rcEvent)
315
316       return
317     }
318
319     val now = TimeHelpers.nowMillis()
320     // TODO: Review this and its usage in user state.
321     // TODO: The assumption is that the resource set increases all the time,
322     // TODO: so the current map contains everything ever known (assuming we do not run backwards in time).
323     val currentResourcesMap = aquarium.currentResourceTypesMap
324
325     val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
326     val nowYear = nowBillingMonthInfo.year
327     val nowMonth = nowBillingMonthInfo.month
328
329     val eventOccurredMillis = rcEvent.getOccurredMillis
330     val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
331     val eventYear = eventBillingMonthInfo.year
332     val eventMonth = eventBillingMonthInfo.month
333
334     def computeBatch(): Unit = {
335       DEBUG("Going for out of sync charging for %s", rcEvent.getOriginalID)
336       this._userState = chargingService.replayMonthChargingUpTo(
337         nowBillingMonthInfo,
338         // Take into account that the event may be out-of-sync.
339         // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
340         now max eventOccurredMillis,
341         this._userStateBootstrap,
342         currentResourcesMap,
343         stdUserStateStoreFunc
344       )
345
346       updateLatestResourceEventIDFrom(rcEvent)
347     }
348
349     def computeRealtime(): Unit = {
350       DEBUG("Going for in sync charging for %s", rcEvent.getOriginalID)
351       chargingService.processResourceEvent(
352         rcEvent,
353         this._userState,
354         nowBillingMonthInfo,
355         true
356       )
357
358       updateLatestResourceEventIDFrom(rcEvent)
359     }
360
361     val oldTotalCredits =
362       if(this._userState!=null)
363         this._userState.totalCredits
364       else
365         0.0D
366     // FIXME check these
367     if(this._userState eq null) {
368       computeBatch()
369     }
370     else if(nowYear != eventYear || nowMonth != eventMonth) {
371       DEBUG(
372         "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
373         nowYear, eventYear,
374         nowMonth, eventMonth
375       )
376       computeBatch()
377     }
378     else if(this._userState.latestResourceEventOccurredMillis < rcEvent.getOccurredMillis) {
379       DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
380       DEBUG(
381         "%s < %s",
382         TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis),
383         TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis)
384       )
385       computeRealtime()
386     }
387     else {
388       DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s  and rcEvent.occurredMillis=%s",
389         TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis),
390         TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis))
391
392       computeBatch()
393     }
394     val newTotalCredits = this._userState.totalCredits
395     if(oldTotalCredits * newTotalCredits < 0)
396       aquarium.eventBus ! new BalanceEvent(this._userState.userID,
397         newTotalCredits>=0)
398     DEBUG("Updated %s", this._userState)
399     logSeparator()
400   }
401
402   def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
403     try{
404       val timeslot = event.timeslot
405       val resourceTypes = aquarium.policyStore.
406                           loadSortedPolicyModelsWithin(timeslot.from.getTime,
407                                                        timeslot.to.getTime).
408                           values.headOption match {
409           case None => Map[String,ResourceType]()
410           case Some(policy:PolicyModel) => policy.resourceTypesMap
411       }
412       val state= if(haveUserState) Some(this._userState.msg) else None
413       val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this._userID,state,resourceTypes)
414       //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
415       //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
416       val billData = GetUserBillResponseData(this._userID,billEntryMsg)
417       sender ! GetUserBillResponse(Right(billData))
418     } catch {
419       case e:Exception =>
420         e.printStackTrace()
421         sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
422     }
423   }
424
425   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
426     val userID = event.userID
427
428     (haveAgreements, haveUserState) match {
429       case (true, true) ⇒
430         // (User CREATEd, with balance state)
431         val realtimeMillis = TimeHelpers.nowMillis()
432         chargingService.calculateRealtimeUserState(
433           this._userState,
434           BillingMonthInfo.fromMillis(realtimeMillis),
435           realtimeMillis
436         )
437
438         sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._userState.totalCredits)))
439
440       case (true, false) ⇒
441         // (User CREATEd, no balance state)
442         // Return the default initial balance
443         sender ! GetUserBalanceResponse(
444           Right(
445             GetUserBalanceResponseData(
446               this._userID,
447               aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis)
448             )))
449
450       case (false, true) ⇒
451         // (Not CREATEd, with balance state)
452         // Clearly this is internal error
453         sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
454
455       case (false, false) ⇒
456         // (Not CREATEd, no balance state)
457         // The user is completely unknown
458         sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
459     }
460   }
461
462   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
463     haveUserState match {
464       case true ⇒
465         val realtimeMillis = TimeHelpers.nowMillis()
466         chargingService.calculateRealtimeUserState(
467           this._userState,
468           BillingMonthInfo.fromMillis(realtimeMillis),
469           realtimeMillis
470         )
471
472         sender ! GetUserStateResponse(Right(this._userState.msg))
473
474       case false ⇒
475         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
476     }
477   }
478
479   def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
480     haveUserState match {
481       case true ⇒
482         DEBUG("haveWorkingUserState: %s", event)
483         val realtimeMillis = TimeHelpers.nowMillis()
484         chargingService.calculateRealtimeUserState(
485           this._userState,
486           BillingMonthInfo.fromMillis(realtimeMillis),
487           realtimeMillis
488         )
489
490         sender ! GetUserWalletResponse(
491           Right(
492             GetUserWalletResponseData(
493               this._userID,
494               this._userState.totalCredits,
495               MessageFactory.newWalletEntriesMsg(this._userState.msg.getWalletEntries)
496             )))
497
498       case false ⇒
499         DEBUG("!haveWorkingUserState: %s", event)
500         haveAgreements match {
501           case true ⇒
502             DEBUG("haveAgreements: %s", event)
503             sender ! GetUserWalletResponse(
504               Right(
505                 GetUserWalletResponseData(
506                   this._userID,
507                   aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis),
508                   MessageFactory.newWalletEntriesMsg()
509                 )))
510
511           case false ⇒
512             DEBUG("!haveUserCreationIMEvent: %s", event)
513             sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
514         }
515     }
516   }
517
518   private[this] def D_userID = {
519     this._userID
520   }
521
522   private[this] def DEBUG(fmt: String, args: Any*) =
523     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
524
525   private[this] def INFO(fmt: String, args: Any*) =
526     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
527
528   private[this] def WARN(fmt: String, args: Any*) =
529     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
530
531   private[this] def ERROR(fmt: String, args: Any*) =
532     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
533
534   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
535     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
536 }