Cache resource mapping
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.AquariumInternalError
41 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
42 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
43 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
44 import gr.grnet.aquarium.actor.message.GetUserBillRequest
45 import gr.grnet.aquarium.actor.message.GetUserBillResponse
46 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
47 import gr.grnet.aquarium.actor.message.GetUserStateRequest
48 import gr.grnet.aquarium.actor.message.GetUserStateResponse
49 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
50 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
51 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
52 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
53 import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
54 import gr.grnet.aquarium.computation.BillingMonthInfo
55 import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg, UserStateMsg}
56 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers}
57 import gr.grnet.aquarium.service.event.BalanceEvent
58 import gr.grnet.aquarium.util.date.TimeHelpers
59 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
60 import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
61 import gr.grnet.aquarium.charging.bill.BillEntryMsg
62 import gr.grnet.aquarium.event.CreditsModel
63 import java.util
64
65 /**
66  *
67  * @author Christos KK Loverdos <loverdos@gmail.com>
68  */
69
70 class UserActor extends ReflectiveRoleableActor {
71   private[this] var _rcMsgCount = 0
72   private[this] var _imMsgCount = 0
73   private[this] var _userID: String = "???"
74   private[this] var _userStateMsg: UserStateMsg = _
75   private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _
76
77   def unsafeUserID = {
78     if(!haveUserID) {
79       throw new AquariumInternalError("%s not initialized")
80     }
81
82     this._userID
83   }
84
85   override def postStop() {
86     DEBUG("I am finally stopped (in postStop())")
87     aquarium.akkaService.notifyUserActorPostStop(this)
88   }
89
90   private[this] def shutmedown(): Unit = {
91     if(haveUserID) {
92       aquarium.akkaService.invalidateUserActor(this)
93     }
94   }
95
96   override protected def onThrowable(t: Throwable, message: AnyRef) = {
97     LogHelpers.logChainOfCauses(logger, t)
98     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
99
100     shutmedown()
101   }
102
103   def role = UserActorRole
104
105   private[this] def chargingService = aquarium.chargingService
106
107   private[this] def stdUserStateStoreFunc = (userState: UserStateMsg) ⇒ {
108     aquarium.userStateStore.insertUserState(userState)
109   }
110
111   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
112   }
113
114   private[this] def haveUserID = this._userID ne null
115   private[this] def unsafeUserCreationIMEventMsg = this._userAgreementHistoryModel.unsafeUserCreationIMEvent
116   private[this] def haveAgreements = this._userAgreementHistoryModel ne null
117   private[this] def isUserCreated = haveAgreements && this._userAgreementHistoryModel.hasUserCreationEvent
118   private[this] def haveUserState = this._userStateMsg ne null
119
120   private[this] def createInitialUserStateMsgFromCreateIMEvent() {
121     assert(haveAgreements, "haveAgreements")
122     assert(isUserCreated, "isUserCreated")
123     assert(this._userAgreementHistoryModel.hasUserCreationEvent, "this._userAgreementHistoryModel.hasUserCreationEvent")
124
125     val userCreationIMEventMsg = unsafeUserCreationIMEventMsg
126     val userStateBootstrap = aquarium.getUserStateBootstrap(userCreationIMEventMsg)
127
128     this._userStateMsg = MessageFactory.newInitialUserStateMsg(
129       this._userID,
130       CreditsModel.from(0.0),
131       TimeHelpers.nowMillis()
132     )
133   }
134
135   /**
136    * Creates the agreement history from all the stored IMEvents.
137    *
138    * @return (`true` iff there was a user CREATE event, the number of events processed)
139    */
140   private[this] def createUserAgreementHistoryFromStoredIMEvents(): (Boolean, Int) = {
141     DEBUG("createUserAgreementHistoryFromStoredIMEvents()")
142     val historyMsg = MessageFactory.newUserAgreementHistoryMsg(this._userID)
143     this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModel(historyMsg)
144
145     var _imcounter = 0
146
147     val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
148       _imcounter += 1
149       DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent)
150
151       if(_imcounter == 1 && !MessageHelpers.isIMEventCreate(imEvent)) {
152         // The very first event must be a CREATE event. Otherwise we abort initialization.
153         // This will normally happen during devops :)
154         INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
155         false
156       }
157       else {
158         val effectiveFromMillis = imEvent.getOccurredMillis
159         val role = imEvent.getRole
160         // calling unsafe just for the side-effect
161         assert(
162           aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis) ne null,
163           "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis)
164         )
165
166         this._userAgreementHistoryModel.insertUserAgreementMsgFromIMEvent(imEvent)
167         true
168       }
169     }
170
171     DEBUG("Agreements: %s", this._userAgreementHistoryModel)
172     (hadCreateEvent, _imcounter)
173   }
174
175   /**
176    * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
177    * messaging hub (rabbitmq).
178    */
179   def onIMEventMsg(imEvent: IMEventMsg) {
180     if(!isUserCreated && MessageHelpers.isIMEventCreate(imEvent)) {
181       assert(this._imMsgCount == 0, "this._imMsgCount == 0")
182       // Create the full agreement history from the original sources (IMEvents)
183       val (userCreated, imEventsCount) = createUserAgreementHistoryFromStoredIMEvents()
184
185       this._imMsgCount = imEventsCount
186       return
187     }
188
189     // Check for out of sync (regarding IMEvents)
190     val isOutOfSyncIM = imEvent.getOccurredMillis < this._userAgreementHistoryModel.latestIMEventOccurredMillis
191     if(isOutOfSyncIM) {
192       // clear all resource state
193       // FIXME implement
194
195       return
196     }
197
198     // Check out of sync (regarding ResourceEvents)
199     val isOutOfSyncRC = false // FIXME implement
200     if(isOutOfSyncRC) {
201       // TODO
202
203       return
204     }
205
206     // OK, seems good
207     assert(!MessageHelpers.isIMEventCreate(imEvent), "!MessageHelpers.isIMEventCreate(imEvent)")
208
209     // Make new agreement
210     this._userAgreementHistoryModel.insertUserAgreementMsgFromIMEvent(imEvent)
211     this._imMsgCount += 1
212     DEBUG("Agreements: %s", this._userAgreementHistoryModel)
213   }
214
215   def onResourceEventMsg(rcEvent: ResourceEventMsg) {
216     if(!isUserCreated) {
217       DEBUG("No agreements. Ignoring %s", rcEvent)
218
219       return
220     }
221
222     val now = TimeHelpers.nowMillis()
223     val resourceMapping = aquarium.resourceMappingAtMillis(now)
224
225     val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
226     val nowYear = nowBillingMonthInfo.year
227     val nowMonth = nowBillingMonthInfo.month
228
229     val eventOccurredMillis = rcEvent.getOccurredMillis
230     val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
231     val eventYear = eventBillingMonthInfo.year
232     val eventMonth = eventBillingMonthInfo.month
233
234     def computeBatch(): Unit = {
235       DEBUG("Going for out of sync charging for %s", rcEvent.getOriginalID)
236
237       this._userStateMsg = chargingService.replayMonthChargingUpTo(
238         this._userAgreementHistoryModel,
239         nowBillingMonthInfo,
240         // Take into account that the event may be out-of-sync.
241         // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
242         now max eventOccurredMillis,
243         resourceMapping,
244         stdUserStateStoreFunc
245       )
246
247     }
248
249     def computeRealtime(): Unit = {
250       DEBUG("Going for in sync charging for %s", rcEvent.getOriginalID)
251       chargingService.processResourceEvent(
252         rcEvent,
253         this._userAgreementHistoryModel,
254         this._userStateMsg,
255         nowBillingMonthInfo,
256         true,
257         resourceMapping
258       )
259
260       this._rcMsgCount += 1
261     }
262
263     val oldTotalCredits =
264       if(this._userStateMsg!=null)
265         this._userStateMsg.totalCredits
266       else
267         0.0D
268     // FIXME check these
269     if(this._userStateMsg eq null) {
270       computeBatch()
271     }
272     else if(nowYear != eventYear || nowMonth != eventMonth) {
273       DEBUG(
274         "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
275         nowYear, eventYear,
276         nowMonth, eventMonth
277       )
278       computeBatch()
279     }
280     else if(this._userStateMsg.latestResourceEventOccurredMillis < rcEvent.getOccurredMillis) {
281       DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
282       DEBUG(
283         "%s < %s",
284         TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userStateMsg.latestResourceEventOccurredMillis),
285         TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis)
286       )
287       computeRealtime()
288     }
289     else {
290       DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s  and rcEvent.occurredMillis=%s",
291         TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userStateMsg.latestResourceEventOccurredMillis),
292         TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis))
293
294       computeBatch()
295     }
296     val newTotalCredits = this._userStateMsg.totalCredits
297     if(oldTotalCredits * newTotalCredits < 0)
298       aquarium.eventBus ! new BalanceEvent(this._userStateMsg.userID,
299         newTotalCredits>=0)
300     DEBUG("Updated %s", this._userStateMsg)
301     logSeparator()
302   }
303
304   def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
305     try{
306       val timeslot = event.timeslot
307       val resourceTypes: Map[String, ResourceType] = aquarium.policyStore.
308                           loadSortedPolicyModelsWithin(timeslot.from.getTime,
309                                                        timeslot.to.getTime).
310                           values.headOption match {
311           case None => Map[String,ResourceType]()
312           case Some(policy:PolicyModel) => policy.resourceTypesMap
313       }
314       val state= if(haveUserState) Some(this._userStateMsg) else None
315       val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this._userID,state,resourceTypes)
316       //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
317       //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
318       val billData = GetUserBillResponseData(this._userID,billEntryMsg)
319       sender ! GetUserBillResponse(Right(billData))
320     } catch {
321       case e:Exception =>
322         e.printStackTrace()
323         sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
324     }
325   }
326
327   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
328     val userID = event.userID
329
330     (haveAgreements, haveUserState) match {
331       case (true, true) ⇒
332         // (User CREATEd, with balance state)
333         val realtimeMillis = TimeHelpers.nowMillis()
334         chargingService.calculateRealtimeUserState(
335           this._userAgreementHistoryModel,
336           this._userStateMsg,
337           BillingMonthInfo.fromMillis(realtimeMillis),
338           aquarium.resourceMappingAtMillis(realtimeMillis),
339           realtimeMillis
340         )
341
342         sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._userStateMsg.totalCredits)))
343
344       case (true, false) ⇒
345         // (User CREATEd, no balance state)
346         // Return the default initial balance
347         sender ! GetUserBalanceResponse(
348           Right(
349             GetUserBalanceResponseData(
350               this._userID,
351               aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)
352             )))
353
354       case (false, true) ⇒
355         // (Not CREATEd, with balance state)
356         // Clearly this is internal error
357         sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
358
359       case (false, false) ⇒
360         // (Not CREATEd, no balance state)
361         // The user is completely unknown
362         sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
363     }
364   }
365
366   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
367     haveUserState match {
368       case true ⇒
369         val realtimeMillis = TimeHelpers.nowMillis()
370         chargingService.calculateRealtimeUserState(
371           this._userAgreementHistoryModel,
372           this._userStateMsg,
373           BillingMonthInfo.fromMillis(realtimeMillis),
374           aquarium.resourceMappingAtMillis(realtimeMillis),
375           realtimeMillis
376         )
377
378         sender ! GetUserStateResponse(Right(this._userStateMsg))
379
380       case false ⇒
381         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
382     }
383   }
384
385   def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
386     haveUserState match {
387       case true ⇒
388         DEBUG("haveWorkingUserState: %s", event)
389         val realtimeMillis = TimeHelpers.nowMillis()
390         chargingService.calculateRealtimeUserState(
391           this._userAgreementHistoryModel,
392           this._userStateMsg,
393           BillingMonthInfo.fromMillis(realtimeMillis),
394           aquarium.resourceMappingAtMillis(realtimeMillis),
395           realtimeMillis
396         )
397
398         sender ! GetUserWalletResponse(
399           Right(
400             GetUserWalletResponseData(
401               this._userID,
402               this._userStateMsg.totalCredits,
403               MessageFactory.newWalletEntriesMsg(this._userStateMsg.getWalletEntries)
404             )))
405
406       case false ⇒
407         DEBUG("!haveWorkingUserState: %s", event)
408         haveAgreements match {
409           case true ⇒
410             DEBUG("haveAgreements: %s", event)
411             sender ! GetUserWalletResponse(
412               Right(
413                 GetUserWalletResponseData(
414                   this._userID,
415                   aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis),
416                   MessageFactory.newWalletEntriesMsg()
417                 )))
418
419           case false ⇒
420             DEBUG("!haveUserCreationIMEvent: %s", event)
421             sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
422         }
423     }
424   }
425
426   def onSetUserActorUserID(userID: String) {
427     this._userID = userID
428   }
429
430   private[this] def D_userID = {
431     this._userID
432   }
433
434   private[this] def DEBUG(fmt: String, args: Any*) =
435     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
436
437   private[this] def INFO(fmt: String, args: Any*) =
438     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
439
440   private[this] def WARN(fmt: String, args: Any*) =
441     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
442
443   private[this] def ERROR(fmt: String, args: Any*) =
444     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
445
446   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
447     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
448 }