7ee1c7a2be433ac9c5376d3664875cb2665b5214
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.actor._
41
42 import akka.config.Supervision.Temporary
43 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
44 import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
45 import gr.grnet.aquarium.util.date.TimeHelpers
46 import gr.grnet.aquarium.event.model.im.IMEventModel
47 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
48 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf, shortNameOfClass, shortNameOfType}
49 import gr.grnet.aquarium.computation.reason.{RealtimeBillingCalculation, InitialUserActorSetup, UserStateChangeReason, IMEventArrival}
50 import gr.grnet.aquarium.AquariumInternalError
51 import gr.grnet.aquarium.computation.state.parts.IMStateSnapshot
52 import gr.grnet.aquarium.computation.BillingMonthInfo
53 import gr.grnet.aquarium.computation.state.{UserStateBootstrap, UserState}
54
55 /**
56  *
57  * @author Christos KK Loverdos <loverdos@gmail.com>
58  */
59
60 class UserActor extends ReflectiveRoleableActor {
61   private[this] var _userID: String = "<?>"
62   private[this] var _imState: IMStateSnapshot = _
63   private[this] var _userState: UserState = _
64   private[this] var _latestResourceEventOccurredMillis = TimeHelpers.nowMillis() // some valid datetime
65
66   self.lifeCycle = Temporary
67
68   private[this] def _shutmedown(): Unit = {
69     if(haveUserState) {
70       UserActorCache.invalidate(_userID)
71     }
72
73     self.stop()
74   }
75
76   override protected def onThrowable(t: Throwable, message: AnyRef) = {
77     LogHelpers.logChainOfCauses(logger, t)
78     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
79
80     _shutmedown()
81   }
82
83   def role = UserActorRole
84
85   private[this] def userStateComputations = aquarium.userStateComputations
86
87   private[this] def stdUserStateStoreFunc = (userState: UserState) ⇒ {
88     aquarium.userStateStore.insertUserState(userState)
89   }
90
91   private[this] def _timestampTheshold = {
92     aquarium.userStateTimestampThreshold
93   }
94
95   private[this] def haveUserState = {
96     this._userState ne null
97   }
98
99   private[this] def haveIMState = {
100     this._imState ne null
101   }
102
103   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
104   }
105
106   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
107   }
108
109   private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel, roleCheck: Option[String]) = {
110     if(haveIMState) {
111       val (newState,
112            creationTimeChanged,
113            activationTimeChanged,
114            roleChanged) = this._imState.updatedWithEvent(imEvent, roleCheck)
115
116       this._imState = newState
117       (creationTimeChanged, activationTimeChanged, roleChanged)
118     } else {
119       this._imState = IMStateSnapshot.initial(imEvent)
120       (
121         imEvent.isCreateUser,
122         true, // first activation status is a change by default??
123         true  // first role is a change by default??
124         )
125     }
126   }
127
128   /**
129    * Creates the IMStateSnapshot and returns the number of updates it made to it.
130    */
131   private[this] def createInitialIMState(): Unit = {
132     val store = aquarium.imEventStore
133
134     var _roleCheck = None: Option[String]
135
136     // this._userID is already set up
137     store.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
138       DEBUG("Replaying %s", imEvent)
139
140       val (creationTimeChanged, activationTimeChanged, roleChanged) = _updateIMStateRoleHistory(imEvent, _roleCheck)
141       _roleCheck = this._imState.roleHistory.lastRoleName
142
143       DEBUG(
144         "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
145         creationTimeChanged, activationTimeChanged, roleChanged,
146         imEvent
147       )
148     }
149
150     DEBUG("Initial %s = %s", shortNameOfType[IMStateSnapshot], this._imState.toJsonString)
151     logSeparator()
152   }
153
154   /**
155    * Resource events are processed only if the user has been activated.
156    */
157   private[this] def shouldProcessResourceEvents: Boolean = {
158     haveIMState && this._imState.hasBeenCreated
159   }
160
161   private[this] def loadUserStateAndUpdateRoleHistory(): Unit = {
162     val userCreationMillis = this._imState.userCreationMillis.get
163     val initialRole = this._imState.roleHistory.firstRole.get.name
164
165     val userStateBootstrap = UserStateBootstrap(
166       this._userID,
167       userCreationMillis,
168       initialRole,
169       aquarium.initialAgreementForRole(initialRole, userCreationMillis),
170       aquarium.initialBalanceForRole(initialRole, userCreationMillis)
171     )
172
173     val now = TimeHelpers.nowMillis()
174     val userState = userStateComputations.doMonthBillingUpTo(
175       BillingMonthInfo.fromMillis(now),
176       now,
177       userStateBootstrap,
178       aquarium.currentResourcesMap,
179       InitialUserActorSetup(),
180       stdUserStateStoreFunc,
181       None
182     )
183
184     this._userState = userState
185
186     // Final touch: Update role history
187     if(haveIMState && haveUserState) {
188       if(this._userState.roleHistory != this._imState.roleHistory) {
189         this._userState = newUserStateWithUpdatedRoleHistory(InitialUserActorSetup())
190       }
191     }
192   }
193
194   private[this] def createInitialUserState(event: InitializeUserState): Unit = {
195     if(!haveIMState) {
196       // Should have been created from `createIMState()`
197       DEBUG("Cannot create user state from %s, since %s = %s", event, shortNameOfClass(classOf[IMStateSnapshot]), this._imState)
198       return
199     }
200
201     if(!this._imState.hasBeenCreated) {
202       // Cannot set the initial state!
203       DEBUG("Cannot create %s from %s, since user has not been created", shortNameOfType[UserState], event)
204       return
205     }
206
207     // We will also need this functionality when receiving IMEvents,
208     // so we place it in a method
209     loadUserStateAndUpdateRoleHistory()
210
211     if(haveUserState) {
212       DEBUG("Initial %s = %s", shortNameOfType[UserState], this._userState.toJsonString)
213       logSeparator()
214     }
215   }
216
217   def onInitializeUserState(event: InitializeUserState): Unit = {
218     this._userID = event.userID
219     DEBUG("Got %s", event)
220
221     createInitialIMState()
222     createInitialUserState(event)
223   }
224
225   /**
226    * Creates a new user state, taking into account the latest role history in IM state snapshot.
227    * Having an IM state snapshot is a prerequisite, otherwise you get an exception; so check before you
228    * call this.
229    */
230   private[this] def newUserStateWithUpdatedRoleHistory(stateChangeReason: UserStateChangeReason): UserState = {
231     // FIXME: Also update agreement
232     this._userState.newWithRoleHistory(this._imState.roleHistory, stateChangeReason)
233   }
234
235   /**
236    * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
237    * When this method is called, we assume that all proper checks have been made and it
238    * is OK to proceed with the event processing.
239    */
240   def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
241     val imEvent = processEvent.imEvent
242
243     if(!haveIMState) {
244       // This is an error. Should have been initialized from somewhere ...
245       throw new AquariumInternalError("Got %s while uninitialized".format(processEvent))
246     }
247
248     if(this._imState.latestIMEvent.id == imEvent.id) {
249       // This happens when the actor is brought to life, then immediately initialized, and then
250       // sent the first IM event. But from the initialization procedure, this IM event will have
251       // already been loaded from DB!
252       INFO("Ignoring first %s just after %s birth", imEvent.toDebugString, shortClassNameOf(this))
253       logSeparator()
254
255       return
256     }
257
258     val (creationTimeChanged,
259          activationTimeChanged,
260          roleChanged) = _updateIMStateRoleHistory(imEvent, this._imState.roleHistory.lastRoleName)
261
262     DEBUG(
263       "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
264       creationTimeChanged, activationTimeChanged, roleChanged,
265       imEvent
266     )
267
268     // Must also update user state if we know when in history the life of a user begins
269     if(creationTimeChanged) {
270       if(!haveUserState) {
271         loadUserStateAndUpdateRoleHistory()
272         INFO("Loaded %s due to %s", shortNameOfType[UserState], imEvent)
273       } else {
274         // Just update role history
275         this._userState = newUserStateWithUpdatedRoleHistory(IMEventArrival(imEvent))
276         INFO("Updated %s due to %s", shortNameOfType[UserState], imEvent)
277       }
278     }
279
280     DEBUG("Updated %s = %s", shortNameOfType[IMStateSnapshot], this._imState.toJsonString)
281     logSeparator()
282   }
283
284   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
285     val rcEvent = event.rcEvent
286
287     if(!shouldProcessResourceEvents) {
288       // This means the user has not been created (at least, as far as Aquarium is concerned).
289       // So, we do not process any resource event
290       DEBUG("Not processing %s", rcEvent.toJsonString)
291       logSeparator()
292
293       return
294     }
295
296     // Since the latest resource event per resource is recorded in the user state,
297     // we do not need to query the store. Just query the in-memory state.
298     // Note: This is a similar situation with the first IMEvent received right after the user
299     //       actor is created.
300     if(this._userState.isLatestResourceEventIDEqualTo(rcEvent.id)) {
301       INFO("Ignoring first %s just after %s birth", rcEvent.toDebugString, shortClassNameOf(this))
302       logSeparator()
303
304       return
305     }
306
307     val now = TimeHelpers.nowMillis()
308     val dt  = now - this._latestResourceEventOccurredMillis
309     val belowThreshold = dt <= _timestampTheshold
310
311     if(belowThreshold) {
312       this._latestResourceEventOccurredMillis = event.rcEvent.occurredMillis
313
314       DEBUG("Below threshold (%s ms). Not processing %s", this._timestampTheshold, rcEvent.toJsonString)
315       logSeparator()
316
317       return
318     }
319
320     val userID = this._userID
321     val userCreationMillis = this._imState.userCreationMillis.get
322     val initialRole = this._imState.roleHistory.firstRoleName.getOrElse(aquarium.defaultInitialUserRole)
323     val initialAgreement = aquarium.initialAgreementForRole(initialRole, userCreationMillis)
324     val initialCredits   = aquarium.initialBalanceForRole(initialRole, userCreationMillis)
325     val userStateBootstrap = UserStateBootstrap(
326       userID,
327       userCreationMillis,
328       initialRole,
329       initialAgreement,
330       initialCredits
331     )
332     val billingMonthInfo = BillingMonthInfo.fromMillis(now)
333     val currentResourcesMap = aquarium.currentResourcesMap
334     val calculationReason = RealtimeBillingCalculation(None, now)
335     val eventOccurredMillis = rcEvent.occurredMillis
336
337 //    DEBUG("Using %s", currentResourcesMap.toJsonString)
338
339     this._userState = aquarium.userStateComputations.doMonthBillingUpTo(
340       billingMonthInfo,
341       // Take into account that the event may be out-of-sync.
342       // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
343       now max eventOccurredMillis,
344       userStateBootstrap,
345       currentResourcesMap,
346       calculationReason,
347       stdUserStateStoreFunc
348     )
349
350     this._latestResourceEventOccurredMillis = event.rcEvent.occurredMillis
351
352     DEBUG("Updated %s = %s", shortClassNameOf(this._userState), this._userState.toJsonString)
353     logSeparator()
354   }
355
356   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
357     val userID = event.userID
358
359     (haveIMState, haveUserState) match {
360       case (true, true) ⇒
361         // (have IMState, have UserState)
362         this._imState.hasBeenActivated match {
363           case true ⇒
364             // (have IMState, activated, have UserState)
365             self reply GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits)))
366
367           case false ⇒
368             // (have IMState, not activated, have UserState)
369             // Since we have user state, we should have been activated
370             self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
371         }
372
373       case (true, false) ⇒
374         // (have IMState, no UserState)
375         this._imState.hasBeenActivated match {
376           case true  ⇒
377             // (have IMState, activated, no UserState)
378             // Since we are activated, we should have some state.
379             self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0002]"), 500)
380           case false ⇒
381             // (have IMState, not activated, no UserState)
382             // The user is virtually unknown
383             self reply GetUserBalanceResponse(Left("User %s not activated [AQU-BAL-0003]".format(userID)), 404 /*Not found*/)
384         }
385
386       case (false, true) ⇒
387         // (no IMState, have UserState)
388         // A bit ridiculous situation
389         self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
390
391       case (false, false) ⇒
392         // (no IMState, no UserState)
393         self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0005]".format(userID)), 404/*Not found*/)
394     }
395   }
396
397   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
398     haveUserState match {
399       case true ⇒
400         self reply GetUserStateResponse(Right(this._userState))
401
402       case false ⇒
403         self reply GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)))
404     }
405   }
406
407   private[this] def D_userID = {
408     this._userID
409   }
410
411   private[this] def DEBUG(fmt: String, args: Any*) =
412     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
413
414   private[this] def INFO(fmt: String, args: Any*) =
415     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
416
417   private[this] def WARN(fmt: String, args: Any*) =
418     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
419
420   private[this] def ERROR(fmt: String, args: Any*) =
421     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
422
423   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
424     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
425 }