Reorg initialization seq
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.{Real, AquariumInternalError}
41 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
42 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
43 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
44 import gr.grnet.aquarium.actor.message.GetUserBillRequest
45 import gr.grnet.aquarium.actor.message.GetUserBillResponse
46 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
47 import gr.grnet.aquarium.actor.message.GetUserStateRequest
48 import gr.grnet.aquarium.actor.message.GetUserStateResponse
49 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
50 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
51 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
52 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
53 import gr.grnet.aquarium.charging.state.UserStateModel
54 import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg}
55 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers}
56 import gr.grnet.aquarium.service.event.BalanceEvent
57 import gr.grnet.aquarium.util.date.TimeHelpers
58 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
59 import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
60 import gr.grnet.aquarium.charging.bill.BillEntryMsg
61
62 /**
63  *
64  * @author Christos KK Loverdos <loverdos@gmail.com>
65  */
66
67 class UserActor extends ReflectiveRoleableActor {
68   private[this] var _imMsgCount = 0
69   private[this] var _userStateModel: UserStateModel = _
70
71   def userID = {
72     if(!haveUserState) {
73       throw new AquariumInternalError("%s not initialized")
74     }
75
76     this._userStateModel.userID
77   }
78
79   override def postStop() {
80     DEBUG("I am finally stopped (in postStop())")
81     aquarium.akkaService.notifyUserActorPostStop(this)
82   }
83
84   private[this] def shutmedown(): Unit = {
85     if(haveUserState) {
86       aquarium.akkaService.invalidateUserActor(this)
87     }
88   }
89
90   override protected def onThrowable(t: Throwable, message: AnyRef) = {
91     LogHelpers.logChainOfCauses(logger, t)
92     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
93
94     shutmedown()
95   }
96
97   def role = UserActorRole
98
99   private[this] def chargingService = aquarium.chargingService
100
101   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
102   }
103
104   private[this] def unsafeUserCreationIMEventMsg = {
105     this._userStateModel.unsafeUserCreationIMEvent
106   }
107
108   private[this] def haveAgreements = {
109     (this._userStateModel ne null)
110   }
111
112   private[this] def haveUserCreationEvent = {
113     haveAgreements &&
114     this._userStateModel.hasUserCreationEvent
115   }
116
117   private[this] def haveUserState = {
118     (this._userStateModel ne null)
119   }
120
121   private[this] def isInitial = this._userStateModel.isInitial
122
123   /**
124    * Creates the agreement history from all the stored IMEvents.
125    *
126    * @return (`true` iff there was a user CREATE event, the number of events processed)
127    */
128   private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = {
129     DEBUG("createUserAgreementHistoryFromStoredIMEvents()")
130     assert(haveUserState, "haveUserState")
131
132
133     var _imcounter = 0
134
135     val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒
136       _imcounter += 1
137       DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent)
138
139       if(_imcounter == 1 && !MessageHelpers.isUserCreationIMEvent(imEvent)) {
140         // The very first event must be a CREATE event. Otherwise we abort initialization.
141         // This will normally happen during devops :)
142         INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
143         false
144       }
145       else {
146         val effectiveFromMillis = imEvent.getOccurredMillis
147         val role = imEvent.getRole
148         // calling unsafe just for the side-effect
149         assert(
150           aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis) ne null,
151           "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis)
152         )
153
154         this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
155         true
156       }
157     }
158
159     this._imMsgCount = _imcounter
160
161     DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
162     (hadCreateEvent, _imcounter)
163   }
164
165   private[this] def saveFirstUserState(userID: String) {
166     this._userStateModel.userStateMsg.setIsFirst(true)
167     this._userStateModel.updateUserStateMsg(
168       aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
169     )
170   }
171
172   private[this] def saveSubsequentUserState() {
173     this._userStateModel.userStateMsg.setIsFirst(false)
174     this._userStateModel.updateUserStateMsg(
175       aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
176     )
177   }
178
179   private[this] def loadLastKnownUserStateAndUpdateAgreements() {
180     val userID = this._userStateModel.userID
181     aquarium.userStateStore.findLatestUserState(userID) match {
182       case None ⇒
183         // First user state ever
184         saveFirstUserState(userID)
185
186       case Some(latestUserState) ⇒
187         this._userStateModel.updateUserStateMsg(latestUserState)
188     }
189   }
190
191   private[this] def processResourceEventsAfterLastKnownUserState() {
192     // Update the user state snapshot with fresh (ie not previously processed) events.
193
194   }
195
196   private[this] def makeUserStateMsgUpToDate() {
197     loadLastKnownUserStateAndUpdateAgreements()
198     processResourceEventsAfterLastKnownUserState()
199   }
200
201   private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = {
202     if(!isInitial) {
203       return false
204     }
205
206     val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID)
207
208     if(userCreated) {
209       makeUserStateMsgUpToDate()
210     }
211
212     nextThing()
213
214     true
215   }
216
217   /**
218    * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
219    * messaging hub (rabbitmq).
220    */
221   def onIMEventMsg(imEvent: IMEventMsg) {
222     if(checkInitial()) {
223       return
224     }
225
226     // Check for out of sync (regarding IMEvents)
227     val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis
228     if(isOutOfSyncIM) {
229       // clear all resource state
230       // FIXME implement
231
232       return
233     }
234
235     // Check out of sync (regarding ResourceEvents)
236     val isOutOfSyncRC = false // FIXME implement
237     if(isOutOfSyncRC) {
238       // TODO
239
240       return
241     }
242
243     // Make new agreement
244     this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
245     this._imMsgCount += 1
246
247     if(MessageHelpers.isUserCreationIMEvent(imEvent)) {
248       makeUserStateMsgUpToDate()
249     }
250
251     DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
252   }
253
254   def onResourceEventMsg(rcEvent: ResourceEventMsg) {
255     if(checkInitial()) {
256       return
257     }
258
259     if(!haveUserCreationEvent) {
260       DEBUG("No agreements. Ignoring %s", rcEvent)
261
262       return
263     }
264
265     assert(haveUserState, "haveUserState")
266
267     val oldTotalCredits = this._userStateModel.totalCreditsAsReal
268
269     chargingService.processResourceEvent(
270       rcEvent.getReceivedMillis,
271       rcEvent,
272       this._userStateModel,
273       aquarium.currentResourceMapping,
274       true
275     )
276
277     val newTotalCredits = this._userStateModel.totalCreditsAsReal
278
279     if(oldTotalCredits.signum * newTotalCredits.signum < 0) {
280       aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0)
281     }
282
283     DEBUG("Updated %s", this._userStateModel)
284   }
285
286   def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
287     checkInitial()
288
289     try{
290       val timeslot = event.timeslot
291       val resourceTypes: Map[String, ResourceType] = aquarium.policyStore.
292                           loadSortedPolicyModelsWithin(timeslot.from.getTime,
293                                                        timeslot.to.getTime).
294                           values.headOption match {
295           case None => Map[String,ResourceType]()
296           case Some(policy:PolicyModel) => policy.resourceTypesMap
297       }
298       val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None
299       val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes)
300       //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
301       //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
302       val billData = GetUserBillResponseData(this.userID,billEntryMsg)
303       sender ! GetUserBillResponse(Right(billData))
304     } catch {
305       case e:Exception =>
306         e.printStackTrace()
307         sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
308     }
309   }
310
311   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
312     checkInitial()
313
314     val userID = event.userID
315
316     (haveAgreements, haveUserState) match {
317       case (true, true) ⇒
318         // (User CREATEd, with balance state)
319         val realtimeMillis = TimeHelpers.nowMillis()
320         chargingService.calculateRealtimeUserState(
321           this._userStateModel,
322           aquarium.currentResourceMapping,
323           realtimeMillis
324         )
325
326         sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits)))
327
328       case (true, false) ⇒
329         // (User CREATEd, no balance state)
330         // Return the default initial balance
331         sender ! GetUserBalanceResponse(
332           Right(
333             GetUserBalanceResponseData(
334               this.userID,
335               aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString()
336             )))
337
338       case (false, true) ⇒
339         // (Not CREATEd, with balance state)
340         // Clearly this is internal error
341         sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
342
343       case (false, false) ⇒
344         // (Not CREATEd, no balance state)
345         // The user is completely unknown
346         sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
347     }
348   }
349
350   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
351     checkInitial()
352
353     haveUserState match {
354       case true ⇒
355         val realtimeMillis = TimeHelpers.nowMillis()
356         chargingService.calculateRealtimeUserState(
357           this._userStateModel,
358           aquarium.currentResourceMapping,
359           realtimeMillis
360         )
361
362         sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg))
363
364       case false ⇒
365         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
366     }
367   }
368
369   def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
370     checkInitial()
371
372     haveUserState match {
373       case true ⇒
374         DEBUG("haveWorkingUserState: %s", event)
375         val realtimeMillis = TimeHelpers.nowMillis()
376         chargingService.calculateRealtimeUserState(
377           this._userStateModel,
378           aquarium.currentResourceMapping,
379           realtimeMillis
380         )
381
382         sender ! GetUserWalletResponse(
383           Right(
384             GetUserWalletResponseData(
385               this.userID,
386               this._userStateModel.totalCredits,
387               MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries)
388             )))
389
390       case false ⇒
391         DEBUG("!haveWorkingUserState: %s", event)
392         haveAgreements match {
393           case true ⇒
394             DEBUG("haveAgreements: %s", event)
395             sender ! GetUserWalletResponse(
396               Right(
397                 GetUserWalletResponseData(
398                   this.userID,
399                   Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)),
400                   MessageFactory.newWalletEntriesMsg()
401                 )))
402
403           case false ⇒
404             DEBUG("!haveUserCreationIMEvent: %s", event)
405             sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
406         }
407     }
408   }
409
410   /**
411    * Initializes the actor's internal state.
412    *
413    * @param userID
414    */
415   def onSetUserActorUserID(userID: String) {
416     // Create the full agreement history from the original sources (IMEvents)
417     this._userStateModel = ModelFactory.newInitialUserStateModel(
418       userID,
419       Real(0),
420       TimeHelpers.nowMillis()
421     )
422
423     require(this._userStateModel.isInitial, "this._userStateModel.isInitial")
424   }
425
426   private[this] def D_userID = {
427     if(haveUserState) userID else "???"
428   }
429
430   private[this] def DEBUG(fmt: String, args: Any*) =
431     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
432
433   private[this] def INFO(fmt: String, args: Any*) =
434     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
435
436   private[this] def WARN(fmt: String, args: Any*) =
437     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
438
439   private[this] def ERROR(fmt: String, args: Any*) =
440     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
441
442   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
443     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
444 }