Second cut of the new policy configuration system
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.actor._
41
42 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
43 import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded}
44 import gr.grnet.aquarium.util.date.TimeHelpers
45 import gr.grnet.aquarium.event.model.im.IMEventModel
46 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
47 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf, shortNameOfClass, shortNameOfType}
48 import gr.grnet.aquarium.computation.reason.{RealtimeBillingCalculation, InitialUserActorSetup, UserStateChangeReason, IMEventArrival}
49 import gr.grnet.aquarium.AquariumInternalError
50 import gr.grnet.aquarium.computation.state.parts.IMStateSnapshot
51 import gr.grnet.aquarium.computation.BillingMonthInfo
52 import gr.grnet.aquarium.computation.state.{UserStateBootstrap, UserState}
53
54 /**
55  *
56  * @author Christos KK Loverdos <loverdos@gmail.com>
57  */
58
59 class UserActor extends ReflectiveRoleableActor {
60   @volatile private[this] var _userID: String = "<?>"
61   @volatile private[this] var _imState: IMStateSnapshot = _
62   @volatile private[this] var _userState: UserState = _
63   @volatile private[this] var _latestResourceEventOccurredMillis = TimeHelpers.nowMillis() // some valid datetime
64
65   def userID = {
66     if(this._userID eq null) {
67       throw new AquariumInternalError("%s not initialized ")
68     }
69
70     this._userID
71   }
72
73   override def postStop() {
74     DEBUG("I am finally stopped (in postStop())")
75     aquarium.akkaService.notifyUserActorPostStop(this)
76   }
77
78   private[this] def shutmedown(): Unit = {
79     if(haveIMState) {
80       aquarium.akkaService.invalidateUserActor(this)
81     }
82   }
83
84   override protected def onThrowable(t: Throwable, message: AnyRef) = {
85     LogHelpers.logChainOfCauses(logger, t)
86     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
87
88     shutmedown()
89   }
90
91   def role = UserActorRole
92
93   private[this] def userStateComputations = aquarium.userStateComputations
94
95   private[this] def stdUserStateStoreFunc = (userState: UserState) ⇒ {
96     aquarium.userStateStore.insertUserState(userState)
97   }
98
99   private[this] def _timestampTheshold = {
100     aquarium.userStateTimestampThreshold
101   }
102
103   private[this] def haveUserState = {
104     this._userState ne null
105   }
106
107   private[this] def haveIMState = {
108     this._imState ne null
109   }
110
111   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
112   }
113
114   private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel, roleCheck: Option[String]) = {
115     if(haveIMState) {
116       val (newState,
117            creationTimeChanged,
118            activationTimeChanged,
119            roleChanged) = this._imState.updatedWithEvent(imEvent, roleCheck)
120
121       this._imState = newState
122       (creationTimeChanged, activationTimeChanged, roleChanged)
123     } else {
124       this._imState = IMStateSnapshot.initial(imEvent)
125       (
126         imEvent.isCreateUser,
127         true, // first activation status is a change by default??
128         true  // first role is a change by default??
129         )
130     }
131   }
132
133   /**
134    * Creates the IMStateSnapshot and returns the number of updates it made to it.
135    */
136   private[this] def createInitialIMState(): Unit = {
137     val store = aquarium.imEventStore
138
139     var _roleCheck = None: Option[String]
140
141     // this._userID is already set up
142     store.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
143       DEBUG("Replaying %s", imEvent)
144
145       val (creationTimeChanged, activationTimeChanged, roleChanged) = _updateIMStateRoleHistory(imEvent, _roleCheck)
146       _roleCheck = this._imState.roleHistory.lastRoleName
147
148       DEBUG(
149         "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
150         creationTimeChanged, activationTimeChanged, roleChanged,
151         imEvent
152       )
153     }
154
155     if(haveIMState) {
156       DEBUG("Initial %s = %s", shortNameOfType[IMStateSnapshot], this._imState.toJsonString)
157       logSeparator()
158     }
159   }
160
161   /**
162    * Resource events are processed only if the user has been activated.
163    */
164   private[this] def shouldProcessResourceEvents: Boolean = {
165     haveIMState && this._imState.hasBeenCreated
166   }
167
168   private[this] def loadUserStateAndUpdateRoleHistory(): Unit = {
169     val userCreationMillis = this._imState.userCreationMillis.get
170     val initialRole = this._imState.roleHistory.firstRole.get.name
171
172     val userStateBootstrap = UserStateBootstrap(
173       this._userID,
174       userCreationMillis,
175       aquarium.initialUserAgreementForRole(initialRole, userCreationMillis),
176       aquarium.initialBalanceForRole(initialRole, userCreationMillis)
177     )
178
179     val now = TimeHelpers.nowMillis()
180     val userState = userStateComputations.doMonthBillingUpTo(
181       BillingMonthInfo.fromMillis(now),
182       now,
183       userStateBootstrap,
184       aquarium.currentResourceTypesMap,
185       InitialUserActorSetup(),
186       stdUserStateStoreFunc,
187       None
188     )
189
190     this._userState = userState
191
192     // Final touch: Update role history
193     if(haveIMState && haveUserState) {
194       if(this._userState.roleHistory != this._imState.roleHistory) {
195         this._userState = newUserStateWithUpdatedRoleHistory(InitialUserActorSetup())
196       }
197     }
198   }
199
200   private[this] def createInitialUserState(event: InitializeUserState): Unit = {
201     if(!haveIMState) {
202       // Should have been created from `createIMState()`
203       DEBUG("Cannot create user state from %s, since %s = %s", event, shortNameOfClass(classOf[IMStateSnapshot]), this._imState)
204       return
205     }
206
207     if(!this._imState.hasBeenCreated) {
208       // Cannot set the initial state!
209       DEBUG("Cannot create %s from %s, since user has not been created", shortNameOfType[UserState], event)
210       return
211     }
212
213     // We will also need this functionality when receiving IMEvents,
214     // so we place it in a method
215     loadUserStateAndUpdateRoleHistory()
216
217     if(haveUserState) {
218       DEBUG("Initial %s = %s", shortNameOfType[UserState], this._userState.toJsonString)
219       logSeparator()
220     }
221   }
222
223   def onInitializeUserState(event: InitializeUserState): Unit = {
224     this._userID = event.userID
225     DEBUG("Got %s", event)
226
227     createInitialIMState()
228     createInitialUserState(event)
229   }
230
231   /**
232    * Creates a new user state, taking into account the latest role history in IM state snapshot.
233    * Having an IM state snapshot is a prerequisite, otherwise you get an exception; so check before you
234    * call this.
235    */
236   private[this] def newUserStateWithUpdatedRoleHistory(stateChangeReason: UserStateChangeReason): UserState = {
237     // FIXME: Also update agreement
238     this._userState.newWithRoleHistory(this._imState.roleHistory, stateChangeReason)
239   }
240
241   /**
242    * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
243    * When this method is called, we assume that all proper checks have been made and it
244    * is OK to proceed with the event processing.
245    */
246   def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
247     val imEvent = processEvent.imEvent
248
249     if(!haveIMState) {
250       // This is an error. Should have been initialized from somewhere ...
251       throw new AquariumInternalError("Got %s while uninitialized".format(processEvent))
252     }
253
254     if(this._imState.latestIMEvent.id == imEvent.id) {
255       // This happens when the actor is brought to life, then immediately initialized, and then
256       // sent the first IM event. But from the initialization procedure, this IM event will have
257       // already been loaded from DB!
258       INFO("Ignoring first %s just after %s birth", imEvent.toDebugString, shortClassNameOf(this))
259       logSeparator()
260
261       return
262     }
263
264     val (creationTimeChanged,
265          activationTimeChanged,
266          roleChanged) = _updateIMStateRoleHistory(imEvent, this._imState.roleHistory.lastRoleName)
267
268     DEBUG(
269       "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
270       creationTimeChanged, activationTimeChanged, roleChanged,
271       imEvent
272     )
273
274     // Must also update user state if we know when in history the life of a user begins
275     if(creationTimeChanged) {
276       if(!haveUserState) {
277         loadUserStateAndUpdateRoleHistory()
278         INFO("Loaded %s due to %s", shortNameOfType[UserState], imEvent)
279       } else {
280         // Just update role history
281         this._userState = newUserStateWithUpdatedRoleHistory(IMEventArrival(imEvent))
282         INFO("Updated %s due to %s", shortNameOfType[UserState], imEvent)
283       }
284     }
285
286     DEBUG("Updated %s = %s", shortNameOfType[IMStateSnapshot], this._imState.toJsonString)
287     logSeparator()
288   }
289
290   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
291     val rcEvent = event.rcEvent
292
293     if(!shouldProcessResourceEvents) {
294       // This means the user has not been created (at least, as far as Aquarium is concerned).
295       // So, we do not process any resource event
296       DEBUG("Not processing %s", rcEvent.toJsonString)
297       logSeparator()
298
299       return
300     }
301
302     // Since the latest resource event per resource is recorded in the user state,
303     // we do not need to query the store. Just query the in-memory state.
304     // Note: This is a similar situation with the first IMEvent received right after the user
305     //       actor is created.
306     if(this._userState.isLatestResourceEventIDEqualTo(rcEvent.id)) {
307       INFO("Ignoring first %s just after %s birth", rcEvent.toDebugString, shortClassNameOf(this))
308       logSeparator()
309
310       return
311     }
312
313     val now = TimeHelpers.nowMillis()
314     val dt  = now - this._latestResourceEventOccurredMillis
315     val belowThreshold = dt <= _timestampTheshold
316
317     if(belowThreshold) {
318       this._latestResourceEventOccurredMillis = event.rcEvent.occurredMillis
319
320       DEBUG("Below threshold (%s ms). Not processing %s", this._timestampTheshold, rcEvent.toJsonString)
321       logSeparator()
322
323       return
324     }
325
326     val userID = this._userID
327     val userCreationMillis = this._imState.userCreationMillis.get
328     val initialRole = this._imState.roleHistory.firstRoleName.getOrElse(aquarium.defaultInitialUserRole)
329     val initialAgreement = aquarium.initialUserAgreementForRole(initialRole, userCreationMillis)
330     val initialCredits   = aquarium.initialBalanceForRole(initialRole, userCreationMillis)
331     val userStateBootstrap = UserStateBootstrap(
332       userID,
333       userCreationMillis,
334       initialAgreement,
335       initialCredits
336     )
337     val billingMonthInfo = BillingMonthInfo.fromMillis(now)
338     val currentResourcesMap = aquarium.currentResourceTypesMap
339     val calculationReason = RealtimeBillingCalculation(None, now)
340     val eventOccurredMillis = rcEvent.occurredMillis
341
342 //    DEBUG("Using %s", currentResourceTypesMap.toJsonString)
343
344     this._userState = aquarium.userStateComputations.doMonthBillingUpTo(
345       billingMonthInfo,
346       // Take into account that the event may be out-of-sync.
347       // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
348       now max eventOccurredMillis,
349       userStateBootstrap,
350       currentResourcesMap,
351       calculationReason,
352       stdUserStateStoreFunc
353     )
354
355     this._latestResourceEventOccurredMillis = event.rcEvent.occurredMillis
356
357     DEBUG("Updated %s = %s", shortClassNameOf(this._userState), this._userState.toJsonString)
358     logSeparator()
359   }
360
361   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
362     val userID = event.userID
363
364     (haveIMState, haveUserState) match {
365       case (true, true) ⇒
366         // (have IMState, have UserState)
367         this._imState.hasBeenActivated match {
368           case true ⇒
369             // (have IMState, activated, have UserState)
370             sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits)))
371
372           case false ⇒
373             // (have IMState, not activated, have UserState)
374             // Since we have user state, we should have been activated
375             sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
376         }
377
378       case (true, false) ⇒
379         // (have IMState, no UserState)
380         this._imState.hasBeenActivated match {
381           case true  ⇒
382             // (have IMState, activated, no UserState)
383             // Since we are activated, we should have some state.
384             sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0002]"), 500)
385           case false ⇒
386             // (have IMState, not activated, no UserState)
387             // The user is virtually unknown
388             sender ! GetUserBalanceResponse(Left("User %s not activated [AQU-BAL-0003]".format(userID)), 404 /*Not found*/)
389         }
390
391       case (false, true) ⇒
392         // (no IMState, have UserState)
393         // A bit ridiculous situation
394         sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
395
396       case (false, false) ⇒
397         // (no IMState, no UserState)
398         sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0005]".format(userID)), 404/*Not found*/)
399     }
400   }
401
402   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
403     haveUserState match {
404       case true ⇒
405         sender ! GetUserStateResponse(Right(this._userState))
406
407       case false ⇒
408         sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
409     }
410   }
411
412   private[this] def D_userID = {
413     this._userID
414   }
415
416   private[this] def DEBUG(fmt: String, args: Any*) =
417     logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
418
419   private[this] def INFO(fmt: String, args: Any*) =
420     logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
421
422   private[this] def WARN(fmt: String, args: Any*) =
423     logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
424
425   private[this] def ERROR(fmt: String, args: Any*) =
426     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
427
428   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
429     logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
430 }