2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.actor._
42 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
43 import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded}
44 import gr.grnet.aquarium.util.date.TimeHelpers
45 import gr.grnet.aquarium.event.model.im.IMEventModel
46 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
47 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf, shortNameOfClass, shortNameOfType}
48 import gr.grnet.aquarium.computation.reason.{RealtimeBillingCalculation, InitialUserActorSetup, UserStateChangeReason, IMEventArrival}
49 import gr.grnet.aquarium.AquariumInternalError
50 import gr.grnet.aquarium.computation.state.parts.IMStateSnapshot
51 import gr.grnet.aquarium.computation.BillingMonthInfo
52 import gr.grnet.aquarium.computation.state.{UserStateBootstrap, UserState}
56 * @author Christos KK Loverdos <loverdos@gmail.com>
59 class UserActor extends ReflectiveRoleableActor {
60 @volatile private[this] var _userID: String = "<?>"
61 @volatile private[this] var _imState: IMStateSnapshot = _
62 @volatile private[this] var _userState: UserState = _
63 @volatile private[this] var _latestResourceEventOccurredMillis = TimeHelpers.nowMillis() // some valid datetime
66 if(this._userID eq null) {
67 throw new AquariumInternalError("%s not initialized ")
73 override def postStop() {
74 DEBUG("I am finally stopped (in postStop())")
75 aquarium.akkaService.notifyUserActorPostStop(this)
78 private[this] def shutmedown(): Unit = {
80 aquarium.akkaService.invalidateUserActor(this)
84 override protected def onThrowable(t: Throwable, message: AnyRef) = {
85 LogHelpers.logChainOfCauses(logger, t)
86 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
91 def role = UserActorRole
93 private[this] def userStateComputations = aquarium.userStateComputations
95 private[this] def stdUserStateStoreFunc = (userState: UserState) ⇒ {
96 aquarium.userStateStore.insertUserState(userState)
99 private[this] def _timestampTheshold = {
100 aquarium.userStateTimestampThreshold
103 private[this] def haveUserState = {
104 this._userState ne null
107 private[this] def haveIMState = {
108 this._imState ne null
111 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
114 private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel, roleCheck: Option[String]) = {
118 activationTimeChanged,
119 roleChanged) = this._imState.updatedWithEvent(imEvent, roleCheck)
121 this._imState = newState
122 (creationTimeChanged, activationTimeChanged, roleChanged)
124 this._imState = IMStateSnapshot.initial(imEvent)
126 imEvent.isCreateUser,
127 true, // first activation status is a change by default??
128 true // first role is a change by default??
134 * Creates the IMStateSnapshot and returns the number of updates it made to it.
136 private[this] def createInitialIMState(): Unit = {
137 val store = aquarium.imEventStore
139 var _roleCheck = None: Option[String]
141 // this._userID is already set up
142 store.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
143 DEBUG("Replaying %s", imEvent)
145 val (creationTimeChanged, activationTimeChanged, roleChanged) = _updateIMStateRoleHistory(imEvent, _roleCheck)
146 _roleCheck = this._imState.roleHistory.lastRoleName
149 "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
150 creationTimeChanged, activationTimeChanged, roleChanged,
156 DEBUG("Initial %s = %s", shortNameOfType[IMStateSnapshot], this._imState.toJsonString)
162 * Resource events are processed only if the user has been activated.
164 private[this] def shouldProcessResourceEvents: Boolean = {
165 haveIMState && this._imState.hasBeenCreated
168 private[this] def loadUserStateAndUpdateRoleHistory(): Unit = {
169 val userCreationMillis = this._imState.userCreationMillis.get
170 val initialRole = this._imState.roleHistory.firstRole.get.name
172 val userStateBootstrap = UserStateBootstrap(
175 aquarium.initialUserAgreementForRole(initialRole, userCreationMillis),
176 aquarium.initialBalanceForRole(initialRole, userCreationMillis)
179 val now = TimeHelpers.nowMillis()
180 val userState = userStateComputations.doMonthBillingUpTo(
181 BillingMonthInfo.fromMillis(now),
184 aquarium.currentResourceTypesMap,
185 InitialUserActorSetup(),
186 stdUserStateStoreFunc,
190 this._userState = userState
192 // Final touch: Update role history
193 if(haveIMState && haveUserState) {
194 if(this._userState.roleHistory != this._imState.roleHistory) {
195 this._userState = newUserStateWithUpdatedRoleHistory(InitialUserActorSetup())
200 private[this] def createInitialUserState(event: InitializeUserState): Unit = {
202 // Should have been created from `createIMState()`
203 DEBUG("Cannot create user state from %s, since %s = %s", event, shortNameOfClass(classOf[IMStateSnapshot]), this._imState)
207 if(!this._imState.hasBeenCreated) {
208 // Cannot set the initial state!
209 DEBUG("Cannot create %s from %s, since user has not been created", shortNameOfType[UserState], event)
213 // We will also need this functionality when receiving IMEvents,
214 // so we place it in a method
215 loadUserStateAndUpdateRoleHistory()
218 DEBUG("Initial %s = %s", shortNameOfType[UserState], this._userState.toJsonString)
223 def onInitializeUserState(event: InitializeUserState): Unit = {
224 this._userID = event.userID
225 DEBUG("Got %s", event)
227 createInitialIMState()
228 createInitialUserState(event)
232 * Creates a new user state, taking into account the latest role history in IM state snapshot.
233 * Having an IM state snapshot is a prerequisite, otherwise you get an exception; so check before you
236 private[this] def newUserStateWithUpdatedRoleHistory(stateChangeReason: UserStateChangeReason): UserState = {
237 // FIXME: Also update agreement
238 this._userState.newWithRoleHistory(this._imState.roleHistory, stateChangeReason)
242 * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
243 * When this method is called, we assume that all proper checks have been made and it
244 * is OK to proceed with the event processing.
246 def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
247 val imEvent = processEvent.imEvent
250 // This is an error. Should have been initialized from somewhere ...
251 throw new AquariumInternalError("Got %s while uninitialized".format(processEvent))
254 if(this._imState.latestIMEvent.id == imEvent.id) {
255 // This happens when the actor is brought to life, then immediately initialized, and then
256 // sent the first IM event. But from the initialization procedure, this IM event will have
257 // already been loaded from DB!
258 INFO("Ignoring first %s just after %s birth", imEvent.toDebugString, shortClassNameOf(this))
264 val (creationTimeChanged,
265 activationTimeChanged,
266 roleChanged) = _updateIMStateRoleHistory(imEvent, this._imState.roleHistory.lastRoleName)
269 "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
270 creationTimeChanged, activationTimeChanged, roleChanged,
274 // Must also update user state if we know when in history the life of a user begins
275 if(creationTimeChanged) {
277 loadUserStateAndUpdateRoleHistory()
278 INFO("Loaded %s due to %s", shortNameOfType[UserState], imEvent)
280 // Just update role history
281 this._userState = newUserStateWithUpdatedRoleHistory(IMEventArrival(imEvent))
282 INFO("Updated %s due to %s", shortNameOfType[UserState], imEvent)
286 DEBUG("Updated %s = %s", shortNameOfType[IMStateSnapshot], this._imState.toJsonString)
290 def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
291 val rcEvent = event.rcEvent
293 if(!shouldProcessResourceEvents) {
294 // This means the user has not been created (at least, as far as Aquarium is concerned).
295 // So, we do not process any resource event
296 DEBUG("Not processing %s", rcEvent.toJsonString)
302 // Since the latest resource event per resource is recorded in the user state,
303 // we do not need to query the store. Just query the in-memory state.
304 // Note: This is a similar situation with the first IMEvent received right after the user
306 if(this._userState.isLatestResourceEventIDEqualTo(rcEvent.id)) {
307 INFO("Ignoring first %s just after %s birth", rcEvent.toDebugString, shortClassNameOf(this))
313 val now = TimeHelpers.nowMillis()
314 val dt = now - this._latestResourceEventOccurredMillis
315 val belowThreshold = dt <= _timestampTheshold
318 this._latestResourceEventOccurredMillis = event.rcEvent.occurredMillis
320 DEBUG("Below threshold (%s ms). Not processing %s", this._timestampTheshold, rcEvent.toJsonString)
326 val userID = this._userID
327 val userCreationMillis = this._imState.userCreationMillis.get
328 val initialRole = this._imState.roleHistory.firstRoleName.getOrElse(aquarium.defaultInitialUserRole)
329 val initialAgreement = aquarium.initialUserAgreementForRole(initialRole, userCreationMillis)
330 val initialCredits = aquarium.initialBalanceForRole(initialRole, userCreationMillis)
331 val userStateBootstrap = UserStateBootstrap(
337 val billingMonthInfo = BillingMonthInfo.fromMillis(now)
338 val currentResourcesMap = aquarium.currentResourceTypesMap
339 val calculationReason = RealtimeBillingCalculation(None, now)
340 val eventOccurredMillis = rcEvent.occurredMillis
342 // DEBUG("Using %s", currentResourceTypesMap.toJsonString)
344 this._userState = aquarium.userStateComputations.doMonthBillingUpTo(
346 // Take into account that the event may be out-of-sync.
347 // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
348 now max eventOccurredMillis,
352 stdUserStateStoreFunc
355 this._latestResourceEventOccurredMillis = event.rcEvent.occurredMillis
357 DEBUG("Updated %s = %s", shortClassNameOf(this._userState), this._userState.toJsonString)
361 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
362 val userID = event.userID
364 (haveIMState, haveUserState) match {
366 // (have IMState, have UserState)
367 this._imState.hasBeenActivated match {
369 // (have IMState, activated, have UserState)
370 sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits)))
373 // (have IMState, not activated, have UserState)
374 // Since we have user state, we should have been activated
375 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
379 // (have IMState, no UserState)
380 this._imState.hasBeenActivated match {
382 // (have IMState, activated, no UserState)
383 // Since we are activated, we should have some state.
384 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0002]"), 500)
386 // (have IMState, not activated, no UserState)
387 // The user is virtually unknown
388 sender ! GetUserBalanceResponse(Left("User %s not activated [AQU-BAL-0003]".format(userID)), 404 /*Not found*/)
392 // (no IMState, have UserState)
393 // A bit ridiculous situation
394 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
396 case (false, false) ⇒
397 // (no IMState, no UserState)
398 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0005]".format(userID)), 404/*Not found*/)
402 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
403 haveUserState match {
405 sender ! GetUserStateResponse(Right(this._userState))
408 sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
412 private[this] def D_userID = {
416 private[this] def DEBUG(fmt: String, args: Any*) =
417 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
419 private[this] def INFO(fmt: String, args: Any*) =
420 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
422 private[this] def WARN(fmt: String, args: Any*) =
423 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
425 private[this] def ERROR(fmt: String, args: Any*) =
426 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
428 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
429 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)