WIP event handling
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.actor._
41
42 import akka.config.Supervision.Temporary
43 import gr.grnet.aquarium.util.{shortClassNameOf, shortNameOfClass, shortNameOfType}
44 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
45 import gr.grnet.aquarium.computation.data.IMStateSnapshot
46 import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
47 import gr.grnet.aquarium.computation.{BillingMonthInfo, UserStateBootstrappingData, UserState}
48 import gr.grnet.aquarium.util.date.TimeHelpers
49 import gr.grnet.aquarium.event.model.im.IMEventModel
50 import gr.grnet.aquarium.{AquariumException, Aquarium}
51 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
52 import gr.grnet.aquarium.computation.reason.{InitialUserActorSetup, UserStateChangeReason, IMEventArrival, InitialUserStateSetup}
53
54 /**
55  *
56  * @author Christos KK Loverdos <loverdos@gmail.com>
57  */
58
59 class UserActor extends ReflectiveRoleableActor {
60   private[this] var _userID: String = "<?>"
61   private[this] var _imState: IMStateSnapshot = _
62   private[this] var _userState: UserState = _
63
64   self.lifeCycle = Temporary
65
66   private[this] def _shutmedown(): Unit = {
67     if(haveUserState) {
68       UserActorCache.invalidate(_userID)
69     }
70
71     self.stop()
72   }
73
74   override protected def onThrowable(t: Throwable, message: AnyRef) = {
75     logChainOfCauses(t)
76     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
77
78     _shutmedown()
79   }
80
81   def role = UserActorRole
82
83   private[this] def aquarium: Aquarium = Aquarium.Instance
84   private[this] def userStateComputations = aquarium.userStateComputations
85
86   private[this] def _timestampTheshold = {
87     aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000)
88   }
89
90   private[this] def haveUserState = {
91     this._userState ne null
92   }
93
94   private[this] def haveIMState = {
95     this._imState ne null
96   }
97
98   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
99   }
100
101   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
102   }
103
104   private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel): (Boolean, Boolean, String) = {
105     if(haveIMState) {
106       val currentRole = this._imState.roleHistory.lastRole.map(_.name).getOrElse(null)
107 //      logger.debug("Current role = %s".format(currentRole))
108
109       if(imEvent.role != currentRole) {
110 //        logger.debug("New role = %s".format(imEvent.role))
111         this._imState = this._imState.updateRoleHistoryWithEvent(imEvent)
112         (true, false, "")
113       } else {
114         val noUpdateReason = "Same role '%s'".format(currentRole)
115 //        logger.debug(noUpdateReason)
116         (false, false, noUpdateReason)
117       }
118     } else {
119       this._imState = IMStateSnapshot.initial(imEvent)
120       (true, true, "")
121     }
122   }
123
124   /**
125    * Creates the IMStateSnapshot and returns the number of updates it made to it.
126    */
127   private[this] def createIMState(event: InitializeUserState): Int = {
128     val userID = event.userID
129     val store = aquarium.imEventStore
130
131     var _updateCount = 0
132
133     store.replayIMEventsInOccurrenceOrder(userID) { imEvent ⇒
134       DEBUG("Replaying %s", imEvent)
135
136       val (updated, firstUpdate, noUpdateReason) = _updateIMStateRoleHistory(imEvent)
137       if(updated) {
138         _updateCount = _updateCount + 1
139         DEBUG("Updated %s for role '%s'", shortNameOfType[IMStateSnapshot], imEvent.role)
140       } else {
141         DEBUG("Not updated %s due to: %s", shortNameOfType[IMStateSnapshot], noUpdateReason)
142       }
143     }
144
145     if(_updateCount > 0)
146       DEBUG("Computed %s = %s", shortNameOfType[IMStateSnapshot], this._imState)
147     else
148       DEBUG("Not computed %s", shortNameOfType[IMStateSnapshot])
149
150     _updateCount
151   }
152
153   /**
154    * Resource events are processed only if the user has been activated.
155    */
156   private[this] def shouldProcessResourceEvents: Boolean = {
157     haveIMState && this._imState.hasBeenActivated
158   }
159
160   private[this] def createUserState(event: InitializeUserState): Unit = {
161     if(!haveIMState) {
162       // Should have been created from `createIMState()`
163       DEBUG("Cannot create user state from %s, since %s = %s", event, shortNameOfClass(classOf[IMStateSnapshot]), this._imState)
164       return
165     }
166
167     if(!this._imState.hasBeenActivated) {
168       // Cannot set the initial state!
169       DEBUG("Cannot create %s from %s, since user is inactive", shortNameOfType[UserState], event)
170       return
171     }
172
173     val userActivationMillis = this._imState.userActivationMillis.get
174     val initialRole = this._imState.roleHistory.firstRole.get.name
175
176     val userStateBootstrap = UserStateBootstrappingData(
177       this._userID,
178       userActivationMillis,
179       initialRole,
180       aquarium.initialAgreementForRole(initialRole, userActivationMillis),
181       aquarium.initialBalanceForRole(initialRole, userActivationMillis)
182     )
183
184     val userState = userStateComputations.doFullMonthlyBilling(
185       userStateBootstrap,
186       BillingMonthInfo.fromMillis(TimeHelpers.nowMillis()),
187       aquarium.currentResourcesMap,
188       InitialUserStateSetup,
189       None
190     )
191
192     this._userState = userState
193
194     // Final touch: Update role history
195     if(haveIMState && haveUserState) {
196       // FIXME: Not satisfied with this redundant info
197       if(this._userState.roleHistory != this._imState.roleHistory) {
198         this._userState = newUserStateWithUpdatedRoleHistory(InitialUserActorSetup)
199       }
200     }
201
202     if(haveUserState) {
203       DEBUG("%s = %s", shortNameOfType[UserState], this._userState)
204     }
205   }
206
207   def onInitializeUserState(event: InitializeUserState): Unit = {
208     val userID = event.userID
209     this._userID = userID
210     DEBUG("Got %s", event)
211
212     createIMState(event)
213     createUserState(event)
214   }
215
216   /**
217    * Creates a new user state, taking into account the latest role history in IM state snapshot.
218    * Having an IM state snapshot is a prerequisite, otherwise you get an exception; so check before you
219    * call this.
220    */
221   private[this] def newUserStateWithUpdatedRoleHistory(stateChangeReason: UserStateChangeReason): UserState = {
222     this._userState.copy(
223       roleHistory = this._imState.roleHistory,
224       // FIXME: Also update agreement
225       stateChangeCounter = this._userState.stateChangeCounter + 1,
226       lastChangeReason = stateChangeReason
227     )
228   }
229
230   /**
231    * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
232    * When this method is called, we assume that all proper checks have been made and it
233    * is OK to proceed with the event processing.
234    */
235   def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
236     val imEvent = processEvent.imEvent
237
238     if(!haveIMState) {
239       // This is an error. Should have been initialized from somewhere ...
240       throw new AquariumException("Got %s while being uninitialized".format(processEvent))
241     }
242
243     if(this._imState.latestIMEvent.id == imEvent.id) {
244       // This happens when the actor is brought to life, then immediately initialized, and then
245       // sent the first IM event. But from the initialization procedure, this IM event will have
246       // already been loaded from DB!
247       INFO("Ignoring first %s just after %s birth", imEvent.toDebugString, shortClassNameOf(this))
248       logSeparator()
249       return
250     }
251
252     val (updated, firstUpdate, noUpdateReason) = _updateIMStateRoleHistory(imEvent)
253
254     if(updated) {
255       DEBUG("Updated %s = %s", shortClassNameOf(this._imState), this._imState)
256
257       // Must also update user state
258       if(shouldProcessResourceEvents) {
259         if(haveUserState) {
260           DEBUG("Also updating %s with new %s",
261             shortClassNameOf(this._userState),
262             shortClassNameOf(this._imState.roleHistory)
263           )
264
265           this._userState = newUserStateWithUpdatedRoleHistory(IMEventArrival(imEvent))
266         }
267       }
268     } else {
269       DEBUG("Not updating %s from %s due to: %s", shortNameOfType[IMStateSnapshot], imEvent, noUpdateReason)
270     }
271
272     logSeparator()
273   }
274
275   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
276     val rcEvent = event.rcEvent
277
278     if(!shouldProcessResourceEvents) {
279       // This means the user has not been activated. So, we do not process any resource event
280       DEBUG("Not processing %s", rcEvent.toJsonString)
281       logSeparator()
282       return
283     }
284   }
285
286
287   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
288     val userID = event.userID
289
290     (haveIMState, haveUserState) match {
291       case (true, true) ⇒
292         // (have IMState, have UserState)
293         this._imState.hasBeenActivated match {
294           case true ⇒
295             // (have IMState, activated, have UserState)
296             self reply GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits)))
297
298           case false ⇒
299             // (have IMState, not activated, have UserState)
300             // Since we have user state, we should have been activated
301             self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
302         }
303
304       case (true, false) ⇒
305         // (have IMState, no UserState)
306         this._imState.hasBeenActivated match {
307           case true  ⇒
308             // (have IMState, activated, no UserState)
309             // Since we are activated, we should have some state.
310             self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0002]"), 500)
311           case false ⇒
312             // (have IMState, not activated, no UserState)
313             // The user is virtually unknown
314             self reply GetUserBalanceResponse(Left("User %s not activated [AQU-BAL-0003]".format(userID)), 404 /*Not found*/)
315         }
316
317       case (false, true) ⇒
318         // (no IMState, have UserState
319         // A bit ridiculous situation
320         self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
321
322       case (false, false) ⇒
323         // (no IMState, no UserState)
324         self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0005]".format(userID)), 404/*Not found*/)
325     }
326   }
327
328   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
329     haveUserState match {
330       case true ⇒
331         self reply GetUserStateResponse(Right(this._userState))
332
333       case false ⇒
334         self reply GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)))
335     }
336   }
337
338   private[this] def D_userID = {
339     this._userID
340   }
341
342   private[this] def DEBUG(fmt: String, args: Any*) =
343     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
344
345   private[this] def INFO(fmt: String, args: Any*) =
346     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
347
348   private[this] def WARN(fmt: String, args: Any*) =
349     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
350
351   private[this] def ERROR(fmt: String, args: Any*) =
352     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
353
354   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
355     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
356 }