Added documentation to CronSpec and TimeslotComputations. Fixed a bug in ContinuousCh...
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.util.date.TimeHelpers
41 import gr.grnet.aquarium.service.event.BalanceEvent
42 import gr.grnet.aquarium.event.model.im.IMEventModel
43 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
44 import gr.grnet.aquarium.actor.message.config.InitializeUserActorState
45 import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
46 import gr.grnet.aquarium.actor.message.event.ProcessResourceEvent
47 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
48 import gr.grnet.aquarium.AquariumInternalError
49 import gr.grnet.aquarium.computation.BillingMonthInfo
50 import gr.grnet.aquarium.charging.state.{WorkingUserState, UserStateModel}
51 import gr.grnet.aquarium.policy.PolicyDefinedFullPriceTableRef
52 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
53 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
54 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
55 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
56 import gr.grnet.aquarium.actor.message.GetUserStateRequest
57 import gr.grnet.aquarium.actor.message.GetUserStateResponse
58 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
59 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
60 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
61 import gr.grnet.aquarium.actor.message.GetUserBillRequest
62 import gr.grnet.aquarium.actor.message.GetUserBillResponse
63 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
64 import gr.grnet.aquarium.charging.state.WorkingAgreementHistory
65 import gr.grnet.aquarium.policy.StdUserAgreement
66 import gr.grnet.aquarium.charging.state.UserStateBootstrap
67 import gr.grnet.aquarium.charging.bill.{AbstractBillEntry, BillEntry}
68
69 /**
70  *
71  * @author Christos KK Loverdos <loverdos@gmail.com>
72  */
73
74 class UserActor extends ReflectiveRoleableActor {
75   private[this] var _userID: String = "<?>"
76   private[this] var _workingUserState: WorkingUserState = _
77   private[this] var _userCreationIMEvent: IMEventModel = _
78   private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory
79   private[this] var _latestIMEventID: String = ""
80   private[this] var _latestResourceEventID: String = ""
81   private[this] var _userStateBootstrap: UserStateBootstrap = _
82
83   def unsafeUserID = {
84     if(!haveUserID) {
85       throw new AquariumInternalError("%s not initialized")
86     }
87
88     this._userID
89   }
90
91   override def postStop() {
92     DEBUG("I am finally stopped (in postStop())")
93     aquarium.akkaService.notifyUserActorPostStop(this)
94   }
95
96   private[this] def shutmedown(): Unit = {
97     if(haveUserID) {
98       aquarium.akkaService.invalidateUserActor(this)
99     }
100   }
101
102   override protected def onThrowable(t: Throwable, message: AnyRef) = {
103     LogHelpers.logChainOfCauses(logger, t)
104     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
105
106     shutmedown()
107   }
108
109   def role = UserActorRole
110
111   private[this] def chargingService = aquarium.chargingService
112
113   private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ {
114     aquarium.userStateStore.insertUserState(userState)
115   }
116
117   @inline private[this] def haveUserID = {
118     this._userID ne null
119   }
120
121   @inline private[this] def haveUserCreationIMEvent = {
122     this._userCreationIMEvent ne null
123   }
124
125   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
126   }
127
128   @inline private[this] def haveAgreements = {
129     this._workingAgreementHistory.size > 0
130   }
131
132   @inline private[this] def haveWorkingUserState = {
133     this._workingUserState ne null
134   }
135
136   @inline private[this] def haveUserStateBootstrap = {
137     this._userStateBootstrap ne null
138   }
139
140   private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = {
141     if(imEvent.isCreateUser) {
142       if(haveUserCreationIMEvent) {
143         throw new AquariumInternalError(
144           "Got user creation event (id=%s) but I already have one (id=%s)",
145             this._userCreationIMEvent.id,
146             imEvent.id
147         )
148       }
149
150       this._userCreationIMEvent = imEvent
151     }
152
153     val effectiveFromMillis = imEvent.occurredMillis
154     val role = imEvent.role
155     // calling unsafe just for the side-effect
156     assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis))
157
158     val newAgreement = StdUserAgreement(
159       imEvent.id,
160       Some(imEvent.id),
161       effectiveFromMillis,
162       Long.MaxValue,
163       role,
164       PolicyDefinedFullPriceTableRef()
165     )
166
167     this._workingAgreementHistory += newAgreement
168   }
169
170   private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = {
171     this._latestIMEventID = imEvent.id
172   }
173
174   private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventModel): Unit = {
175     this._latestResourceEventID = rcEvent.id
176   }
177
178   /**
179    * Creates the initial state that is related to IMEvents.
180    */
181   private[this] def initializeStateOfIMEvents(): Unit = {
182     // NOTE: this._userID is already set up by onInitializeUserActorState()
183     aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
184       DEBUG("Replaying %s", imEvent)
185
186       updateAgreementHistoryFrom(imEvent)
187       updateLatestIMEventIDFrom(imEvent)
188     }
189
190     if(haveAgreements) {
191       DEBUG("Initial agreement history %s", this._workingAgreementHistory.toJsonString)
192       logSeparator()
193     }
194   }
195
196   /**
197    * Resource events are processed only if the user has been created and has agreements.
198    * Otherwise nothing can be computed.
199    */
200   private[this] def shouldProcessResourceEvents: Boolean = {
201     haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
202   }
203
204   private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = {
205     assert(this.haveAgreements, "this.haveAgreements")
206     assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
207
208     val userCreationMillis = this._userCreationIMEvent.occurredMillis
209     val userCreationRole = this._userCreationIMEvent.role // initial role
210     val userCreationIMEventID = this._userCreationIMEvent.id
211
212     if(!haveUserStateBootstrap) {
213       this._userStateBootstrap = UserStateBootstrap(
214         this._userID,
215         userCreationMillis,
216         aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)),
217         aquarium.initialUserBalance(userCreationRole, userCreationMillis)
218       )
219     }
220
221     val now = TimeHelpers.nowMillis()
222     this._workingUserState = chargingService.replayMonthChargingUpTo(
223       BillingMonthInfo.fromMillis(now),
224       now,
225       this._userStateBootstrap,
226       aquarium.currentResourceTypesMap,
227       aquarium.userStateStore.insertUserState
228     )
229
230     // Final touch: Update agreement history in the working user state.
231     // The assumption is that all agreement changes go via IMEvents, so the
232     // state this._workingAgreementHistory is always the authoritative source.
233     if(haveWorkingUserState) {
234       this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory)
235       DEBUG("Computed working user state %s", this._workingUserState.toJsonString)
236     }
237   }
238
239   private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
240     if(!this.haveAgreements) {
241       DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
242       return
243     }
244
245     if(!this.haveUserCreationIMEvent) {
246       DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
247       return
248     }
249
250     // We will also need this functionality when receiving IMEvents, so we place it in a method
251     loadWorkingUserStateAndUpdateAgreementHistory()
252
253     if(haveWorkingUserState) {
254       DEBUG("Initial working user state %s", this._workingUserState.toJsonString)
255       logSeparator()
256     }
257   }
258
259   def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
260     this._userID = event.userID
261     DEBUG("Got %s", event)
262
263     initializeStateOfIMEvents()
264     initializeStateOfResourceEvents(event)
265   }
266
267   /**
268    * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
269    * When this method is called, we assume that all proper checks have been made and it
270    * is OK to proceed with the event processing.
271    */
272   def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
273     val imEvent = processEvent.imEvent
274     val hadUserCreationIMEvent = haveUserCreationIMEvent
275
276     if(!haveAgreements) {
277       // This IMEvent has arrived after any ResourceEvents
278       INFO("Arrived after any ResourceEvent: %s", imEvent.toDebugString)
279       initializeStateOfIMEvents()
280     }
281     else {
282       if(this._latestIMEventID == imEvent.id) {
283         // This happens when the actor is brought to life, then immediately initialized, and then
284         // sent the first IM event. But from the initialization procedure, this IM event will have
285         // already been loaded from DB!
286         INFO("Ignoring first %s", imEvent.toDebugString)
287         logSeparator()
288
289         //this._latestIMEventID = imEvent.id
290         return
291       }
292       if(imEvent.isAddCredits)  {
293         if(!hadUserCreationIMEvent && haveUserCreationIMEvent)
294         loadWorkingUserStateAndUpdateAgreementHistory()
295         onHandleAddCreditsEvent(imEvent)
296
297       } else {
298       updateAgreementHistoryFrom(imEvent)
299       updateLatestIMEventIDFrom(imEvent)
300         //Thread.sleep(3000)
301       }
302     }
303
304     // Must also update user state if we know when in history the life of a user begins
305     if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
306       INFO("Processing user state, since we had a CREATE IMEvent")
307       loadWorkingUserStateAndUpdateAgreementHistory()
308     }
309
310     logSeparator()
311   }
312
313   /* Convert astakos message for adding credits
314     to a regular RESOURCE message */
315   def onHandleAddCreditsEvent(imEvent : IMEventModel) = {
316     DEBUG("Got %s", imEvent.toJsonString)
317
318     val credits = imEvent.details(IMEventModel.DetailsNames.credits).toInt.toDouble
319     val event = new StdResourceEvent(
320       imEvent.id,
321       imEvent.occurredMillis,
322       imEvent.receivedMillis,
323       imEvent.userID,
324       imEvent.clientID,
325       imEvent.eventType,
326       imEvent.eventType,
327       credits,
328       imEvent.eventVersion,
329       imEvent.details
330     )
331     DEBUG("Transformed to %s", event)
332     DEBUG("Total credits before: %s", _workingUserState.totalCredits)
333     aquarium.resourceEventStore.insertResourceEvent(event)
334     onProcessResourceEvent(new ProcessResourceEvent(event))
335     DEBUG("Total credits after: %s", _workingUserState.totalCredits)
336     //Console.err.println("OK.")
337   }
338
339   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
340     val rcEvent = event.rcEvent
341
342     if(!shouldProcessResourceEvents) {
343       // This means the user has not been created (at least, as far as Aquarium is concerned).
344       // So, we do not process any resource event
345       DEBUG("Not processing %s", rcEvent.toJsonString)
346       logSeparator()
347
348       return
349     }
350
351     // Since the latest resource event per resource is recorded in the user state,
352     // we do not need to query the store. Just query the in-memory state.
353     // Note: This is a similar situation with the first IMEvent received right after the user
354     //       actor is created.
355     if(this._latestResourceEventID == rcEvent.id) {
356       INFO("Ignoring first %s", rcEvent.toDebugString)
357       logSeparator()
358
359       return
360     }
361
362     val now = TimeHelpers.nowMillis()
363     // TODO: Review this and its usage in user state.
364     // TODO: The assumption is that the resource set increases all the time,
365     // TODO: so the current map contains everything ever known (assuming we do not run backwards in time).
366     val currentResourcesMap = aquarium.currentResourceTypesMap
367
368     val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
369     val nowYear = nowBillingMonthInfo.year
370     val nowMonth = nowBillingMonthInfo.month
371
372     val eventOccurredMillis = rcEvent.occurredMillis
373     val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
374     val eventYear = eventBillingMonthInfo.year
375     val eventMonth = eventBillingMonthInfo.month
376
377     def computeBatch(): Unit = {
378       DEBUG("Going for out of sync charging")
379       this._workingUserState = chargingService.replayMonthChargingUpTo(
380         nowBillingMonthInfo,
381         // Take into account that the event may be out-of-sync.
382         // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
383         now max eventOccurredMillis,
384         this._userStateBootstrap,
385         currentResourcesMap,
386         stdUserStateStoreFunc
387       )
388
389       updateLatestResourceEventIDFrom(rcEvent)
390     }
391
392     def computeRealtime(): Unit = {
393       DEBUG("Going for in sync charging")
394       chargingService.processResourceEvent(
395         rcEvent,
396         this._workingUserState,
397         nowBillingMonthInfo,
398         true
399       )
400
401       updateLatestResourceEventIDFrom(rcEvent)
402     }
403
404     val oldTotalCredits =
405       if(this._workingUserState!=null)
406         this._workingUserState.totalCredits
407       else
408         0.0D
409     // FIXME check these
410     if(nowYear != eventYear || nowMonth != eventMonth) {
411       DEBUG(
412         "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
413         nowYear, eventYear,
414         nowMonth, eventMonth
415       )
416       computeBatch()
417     }
418     else if(this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis) {
419       DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
420       DEBUG(
421         "%s < %s",
422         TimeHelpers.toYYYYMMDDHHMMSSSSS(this._workingUserState.latestResourceEventOccurredMillis),
423         TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.occurredMillis)
424       )
425       computeRealtime()
426     }
427     else {
428       computeBatch()
429     }
430     val newTotalCredits = this._workingUserState.totalCredits
431     if(oldTotalCredits * newTotalCredits < 0)
432       aquarium.eventBus ! new BalanceEvent(this._workingUserState.userID,
433                                            newTotalCredits>=0)
434     DEBUG("Updated %s", this._workingUserState)
435     logSeparator()
436   }
437
438   def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
439     try{
440       val timeslot = event.timeslot
441       val state= if(haveWorkingUserState) Some(this._workingUserState) else None
442       val billEntry = AbstractBillEntry.fromWorkingUserState(timeslot,this._userID,state)
443       val billData = GetUserBillResponseData(this._userID,billEntry)
444       sender ! GetUserBillResponse(Right(billData))
445     } catch {
446       case e:Exception =>
447        e.printStackTrace()
448        sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
449     }
450   }
451
452   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
453     val userID = event.userID
454
455     (haveUserCreationIMEvent, haveWorkingUserState) match {
456       case (true, true) ⇒
457         // (User CREATEd, with balance state)
458         val realtimeMillis = TimeHelpers.nowMillis()
459         chargingService.calculateRealtimeWorkingUserState(
460           this._workingUserState,
461           BillingMonthInfo.fromMillis(realtimeMillis),
462           realtimeMillis
463         )
464
465         sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits)))
466
467       case (true, false) ⇒
468         // (User CREATEd, no balance state)
469         // Return the default initial balance
470         sender ! GetUserBalanceResponse(
471           Right(
472             GetUserBalanceResponseData(
473               this._userID,
474               aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis)
475         )))
476
477       case (false, true) ⇒
478         // (Not CREATEd, with balance state)
479         // Clearly this is internal error
480         sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
481
482       case (false, false) ⇒
483         // (Not CREATEd, no balance state)
484         // The user is completely unknown
485         sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
486     }
487   }
488
489   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
490     haveWorkingUserState match {
491       case true ⇒
492         val realtimeMillis = TimeHelpers.nowMillis()
493         chargingService.calculateRealtimeWorkingUserState(
494           this._workingUserState,
495           BillingMonthInfo.fromMillis(realtimeMillis),
496           realtimeMillis
497         )
498
499         sender ! GetUserStateResponse(Right(this._workingUserState))
500
501       case false ⇒
502         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
503     }
504   }
505
506   def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
507     haveWorkingUserState match {
508       case true ⇒
509         DEBUG("haveWorkingUserState: %s", event)
510         val realtimeMillis = TimeHelpers.nowMillis()
511         chargingService.calculateRealtimeWorkingUserState(
512           this._workingUserState,
513           BillingMonthInfo.fromMillis(realtimeMillis),
514           realtimeMillis
515         )
516
517         sender ! GetUserWalletResponse(
518           Right(
519             GetUserWalletResponseData(
520               this._userID,
521               this._workingUserState.totalCredits,
522               this._workingUserState.walletEntries.toList
523         )))
524
525       case false ⇒
526         DEBUG("!haveWorkingUserState: %s", event)
527         haveUserCreationIMEvent match {
528           case true ⇒
529             DEBUG("haveUserCreationIMEvent: %s", event)
530             sender ! GetUserWalletResponse(
531               Right(
532                 GetUserWalletResponseData(
533                   this._userID,
534                   aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis),
535                   Nil
536             )))
537
538           case false ⇒
539             DEBUG("!haveUserCreationIMEvent: %s", event)
540             sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
541         }
542     }
543   }
544
545   private[this] def D_userID = {
546     this._userID
547   }
548
549   private[this] def DEBUG(fmt: String, args: Any*) =
550     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
551
552   private[this] def INFO(fmt: String, args: Any*) =
553     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
554
555   private[this] def WARN(fmt: String, args: Any*) =
556     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
557
558   private[this] def ERROR(fmt: String, args: Any*) =
559     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
560
561   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
562     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
563 }