Return the number of resource events processed
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.{Real, AquariumInternalError}
41 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
42 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
43 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
44 import gr.grnet.aquarium.actor.message.GetUserBillRequest
45 import gr.grnet.aquarium.actor.message.GetUserBillResponse
46 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
47 import gr.grnet.aquarium.actor.message.GetUserStateRequest
48 import gr.grnet.aquarium.actor.message.GetUserStateResponse
49 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
50 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
51 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
52 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
53 import gr.grnet.aquarium.charging.state.UserStateModel
54 import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg}
55 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers}
56 import gr.grnet.aquarium.service.event.BalanceEvent
57 import gr.grnet.aquarium.util.date.TimeHelpers
58 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
59 import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
60 import gr.grnet.aquarium.charging.bill.BillEntryMsg
61
62 /**
63  *
64  * @author Christos KK Loverdos <loverdos@gmail.com>
65  */
66
67 class UserActor extends ReflectiveRoleableActor {
68   private[this] var _imMsgCount = 0
69   private[this] var _userStateModel: UserStateModel = _
70
71   def userID = {
72     if(!haveUserState) {
73       throw new AquariumInternalError("%s not initialized")
74     }
75
76     this._userStateModel.userID
77   }
78
79   override def postStop() {
80     DEBUG("I am finally stopped (in postStop())")
81     aquarium.akkaService.notifyUserActorPostStop(this)
82   }
83
84   private[this] def shutmedown(): Unit = {
85     if(haveUserState) {
86       aquarium.akkaService.invalidateUserActor(this)
87     }
88   }
89
90   override protected def onThrowable(t: Throwable, message: AnyRef) = {
91     LogHelpers.logChainOfCauses(logger, t)
92     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
93
94     shutmedown()
95   }
96
97   def role = UserActorRole
98
99   private[this] def chargingService = aquarium.chargingService
100
101   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
102   }
103
104   private[this] def unsafeUserCreationIMEventMsg = {
105     this._userStateModel.unsafeUserCreationIMEvent
106   }
107
108   private[this] def haveAgreements = {
109     (this._userStateModel ne null)
110   }
111
112   private[this] def haveUserCreationEvent = {
113     haveAgreements &&
114     this._userStateModel.hasUserCreationEvent
115   }
116
117   private[this] def haveUserState = {
118     (this._userStateModel ne null)
119   }
120
121   private[this] def isInitial = this._userStateModel.isInitial
122
123   /**
124    * Creates the agreement history from all the stored IMEvents.
125    *
126    * @return (`true` iff there was a user CREATE event, the number of events processed)
127    */
128   private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = {
129     DEBUG("createUserAgreementHistoryFromStoredIMEvents()")
130     assert(haveUserState, "haveUserState")
131
132
133     var _imcounter = 0
134
135     val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒
136       _imcounter += 1
137       DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent)
138
139       if(_imcounter == 1 && !MessageHelpers.isUserCreationIMEvent(imEvent)) {
140         // The very first event must be a CREATE event. Otherwise we abort initialization.
141         // This will normally happen during devops :)
142         INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
143         false
144       }
145       else {
146         val effectiveFromMillis = imEvent.getOccurredMillis
147         val role = imEvent.getRole
148         // calling unsafe just for the side-effect
149         assert(
150           aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis) ne null,
151           "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis)
152         )
153
154         this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
155         true
156       }
157     }
158
159     this._imMsgCount = _imcounter
160
161     DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
162     (hadCreateEvent, _imcounter)
163   }
164
165   private[this] def saveFirstUserState(userID: String) {
166     this._userStateModel.userStateMsg.setIsFirst(true)
167     this._userStateModel.updateUserStateMsg(
168       aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
169     )
170   }
171
172   private[this] def saveSubsequentUserState() {
173     this._userStateModel.userStateMsg.setIsFirst(false)
174     this._userStateModel.updateUserStateMsg(
175       aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
176     )
177   }
178
179   private[this] def loadLastKnownUserStateAndUpdateAgreements() {
180     val userID = this._userStateModel.userID
181     aquarium.userStateStore.findLatestUserState(userID) match {
182       case None ⇒
183         // First user state ever
184         saveFirstUserState(userID)
185
186       case Some(latestUserState) ⇒
187         this._userStateModel.updateUserStateMsg(latestUserState)
188     }
189   }
190
191   private[this] def processResourceEventsAfterLastKnownUserState() {
192     // Update the user state snapshot with fresh (ie not previously processed) events.
193   }
194
195   private[this] def makeUserStateMsgUpToDate() {
196     loadLastKnownUserStateAndUpdateAgreements()
197     processResourceEventsAfterLastKnownUserState()
198   }
199
200   private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = {
201     if(!isInitial) {
202       return false
203     }
204
205     val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID)
206
207     if(userCreated) {
208       makeUserStateMsgUpToDate()
209     }
210
211     nextThing()
212
213     true
214   }
215
216   /**
217    * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
218    * messaging hub (rabbitmq).
219    */
220   def onIMEventMsg(imEvent: IMEventMsg) {
221     if(checkInitial()) {
222       return
223     }
224
225     // Check for out of sync (regarding IMEvents)
226     val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis
227     if(isOutOfSyncIM) {
228       // clear all resource state
229       // FIXME implement
230
231       return
232     }
233
234     // Check out of sync (regarding ResourceEvents)
235     val isOutOfSyncRC = false // FIXME implement
236     if(isOutOfSyncRC) {
237       // TODO
238
239       return
240     }
241
242     // Make new agreement
243     this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
244     this._imMsgCount += 1
245
246     if(MessageHelpers.isUserCreationIMEvent(imEvent)) {
247       makeUserStateMsgUpToDate()
248     }
249
250     DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
251   }
252
253   def onResourceEventMsg(rcEvent: ResourceEventMsg) {
254     if(checkInitial()) {
255       return
256     }
257
258     if(!haveUserCreationEvent) {
259       DEBUG("No agreements. Ignoring %s", rcEvent)
260
261       return
262     }
263
264     assert(haveUserState, "haveUserState")
265
266     val oldTotalCredits = this._userStateModel.totalCreditsAsReal
267
268     chargingService.processResourceEvent(
269       rcEvent.getReceivedMillis,
270       rcEvent,
271       this._userStateModel,
272       aquarium.currentResourceMapping,
273       true
274     )
275
276     val newTotalCredits = this._userStateModel.totalCreditsAsReal
277
278     if(oldTotalCredits.signum * newTotalCredits.signum < 0) {
279       aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0)
280     }
281
282     DEBUG("Updated %s", this._userStateModel)
283   }
284
285   def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
286     checkInitial()
287
288     try{
289       val timeslot = event.timeslot
290       val resourceTypes: Map[String, ResourceType] = aquarium.policyStore.
291                           loadSortedPolicyModelsWithin(timeslot.from.getTime,
292                                                        timeslot.to.getTime).
293                           values.headOption match {
294           case None => Map[String,ResourceType]()
295           case Some(policy:PolicyModel) => policy.resourceTypesMap
296       }
297       val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None
298       val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes)
299       //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
300       //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
301       val billData = GetUserBillResponseData(this.userID,billEntryMsg)
302       sender ! GetUserBillResponse(Right(billData))
303     } catch {
304       case e:Exception =>
305         e.printStackTrace()
306         sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
307     }
308   }
309
310   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
311     checkInitial()
312
313     val userID = event.userID
314
315     (haveAgreements, haveUserState) match {
316       case (true, true) ⇒
317         // (User CREATEd, with balance state)
318         val realtimeMillis = TimeHelpers.nowMillis()
319         chargingService.calculateRealtimeUserState(
320           this._userStateModel,
321           aquarium.currentResourceMapping,
322           realtimeMillis
323         )
324
325         sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits)))
326
327       case (true, false) ⇒
328         // (User CREATEd, no balance state)
329         // Return the default initial balance
330         sender ! GetUserBalanceResponse(
331           Right(
332             GetUserBalanceResponseData(
333               this.userID,
334               aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString()
335             )))
336
337       case (false, true) ⇒
338         // (Not CREATEd, with balance state)
339         // Clearly this is internal error
340         sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
341
342       case (false, false) ⇒
343         // (Not CREATEd, no balance state)
344         // The user is completely unknown
345         sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
346     }
347   }
348
349   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
350     checkInitial()
351
352     haveUserState match {
353       case true ⇒
354         val realtimeMillis = TimeHelpers.nowMillis()
355         chargingService.calculateRealtimeUserState(
356           this._userStateModel,
357           aquarium.currentResourceMapping,
358           realtimeMillis
359         )
360
361         sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg))
362
363       case false ⇒
364         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
365     }
366   }
367
368   def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
369     checkInitial()
370
371     haveUserState match {
372       case true ⇒
373         DEBUG("haveWorkingUserState: %s", event)
374         val realtimeMillis = TimeHelpers.nowMillis()
375         chargingService.calculateRealtimeUserState(
376           this._userStateModel,
377           aquarium.currentResourceMapping,
378           realtimeMillis
379         )
380
381         sender ! GetUserWalletResponse(
382           Right(
383             GetUserWalletResponseData(
384               this.userID,
385               this._userStateModel.totalCredits,
386               MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries)
387             )))
388
389       case false ⇒
390         DEBUG("!haveWorkingUserState: %s", event)
391         haveAgreements match {
392           case true ⇒
393             DEBUG("haveAgreements: %s", event)
394             sender ! GetUserWalletResponse(
395               Right(
396                 GetUserWalletResponseData(
397                   this.userID,
398                   Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)),
399                   MessageFactory.newWalletEntriesMsg()
400                 )))
401
402           case false ⇒
403             DEBUG("!haveUserCreationIMEvent: %s", event)
404             sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
405         }
406     }
407   }
408
409   /**
410    * Initializes the actor's internal state.
411    *
412    * @param userID
413    */
414   def onSetUserActorUserID(userID: String) {
415     // Create the full agreement history from the original sources (IMEvents)
416     this._userStateModel = ModelFactory.newInitialUserStateModel(
417       userID,
418       Real(0),
419       TimeHelpers.nowMillis()
420     )
421
422     require(this._userStateModel.isInitial, "this._userStateModel.isInitial")
423   }
424
425   private[this] def D_userID = {
426     if(haveUserState) userID else "???"
427   }
428
429   private[this] def DEBUG(fmt: String, args: Any*) =
430     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
431
432   private[this] def INFO(fmt: String, args: Any*) =
433     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
434
435   private[this] def WARN(fmt: String, args: Any*) =
436     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
437
438   private[this] def ERROR(fmt: String, args: Any*) =
439     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
440
441   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
442     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
443 }