2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.actor._
42 import akka.config.Supervision.Temporary
43 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
44 import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
45 import gr.grnet.aquarium.util.date.TimeHelpers
46 import gr.grnet.aquarium.event.model.im.IMEventModel
47 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
48 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf, shortNameOfClass, shortNameOfType}
49 import gr.grnet.aquarium.computation.reason.{RealtimeBillingCalculation, InitialUserActorSetup, UserStateChangeReason, IMEventArrival}
50 import gr.grnet.aquarium.AquariumInternalError
51 import gr.grnet.aquarium.computation.state.parts.IMStateSnapshot
52 import gr.grnet.aquarium.computation.BillingMonthInfo
53 import gr.grnet.aquarium.computation.state.{UserStateBootstrap, UserState}
57 * @author Christos KK Loverdos <loverdos@gmail.com>
60 class UserActor extends ReflectiveRoleableActor {
61 private[this] var _userID: String = "<?>"
62 private[this] var _imState: IMStateSnapshot = _
63 private[this] var _userState: UserState = _
64 private[this] var _latestResourceEventOccurredMillis = TimeHelpers.nowMillis() // some valid datetime
66 self.lifeCycle = Temporary
68 private[this] def _shutmedown(): Unit = {
70 UserActorCache.invalidate(_userID)
76 override protected def onThrowable(t: Throwable, message: AnyRef) = {
77 LogHelpers.logChainOfCauses(logger, t)
78 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
83 def role = UserActorRole
85 private[this] def userStateComputations = aquarium.userStateComputations
87 private[this] def stdUserStateStoreFunc = (userState: UserState) ⇒ {
88 aquarium.userStateStore.insertUserState(userState)
91 private[this] def _timestampTheshold = {
92 aquarium.userStateTimestampThreshold
95 private[this] def haveUserState = {
96 this._userState ne null
99 private[this] def haveIMState = {
100 this._imState ne null
103 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
106 def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
109 private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel, roleCheck: Option[String]) = {
113 activationTimeChanged,
114 roleChanged) = this._imState.updatedWithEvent(imEvent, roleCheck)
116 this._imState = newState
117 (creationTimeChanged, activationTimeChanged, roleChanged)
119 this._imState = IMStateSnapshot.initial(imEvent)
121 imEvent.isCreateUser,
122 true, // first activation status is a change by default??
123 true // first role is a change by default??
129 * Creates the IMStateSnapshot and returns the number of updates it made to it.
131 private[this] def createInitialIMState(): Unit = {
132 val store = aquarium.imEventStore
134 var _roleCheck = None: Option[String]
136 // this._userID is already set up
137 store.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
138 DEBUG("Replaying %s", imEvent)
140 val (creationTimeChanged, activationTimeChanged, roleChanged) = _updateIMStateRoleHistory(imEvent, _roleCheck)
141 _roleCheck = this._imState.roleHistory.lastRoleName
144 "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
145 creationTimeChanged, activationTimeChanged, roleChanged,
150 DEBUG("Initial %s = %s", shortNameOfType[IMStateSnapshot], this._imState.toJsonString)
155 * Resource events are processed only if the user has been activated.
157 private[this] def shouldProcessResourceEvents: Boolean = {
158 haveIMState && this._imState.hasBeenCreated
161 private[this] def loadUserStateAndUpdateRoleHistory(): Unit = {
162 val userCreationMillis = this._imState.userCreationMillis.get
163 val initialRole = this._imState.roleHistory.firstRole.get.name
165 val userStateBootstrap = UserStateBootstrap(
169 aquarium.initialAgreementForRole(initialRole, userCreationMillis),
170 aquarium.initialBalanceForRole(initialRole, userCreationMillis)
173 val now = TimeHelpers.nowMillis()
174 val userState = userStateComputations.doMonthBillingUpTo(
175 BillingMonthInfo.fromMillis(now),
178 aquarium.currentResourcesMap,
179 InitialUserActorSetup(),
180 stdUserStateStoreFunc,
184 this._userState = userState
186 // Final touch: Update role history
187 if(haveIMState && haveUserState) {
188 if(this._userState.roleHistory != this._imState.roleHistory) {
189 this._userState = newUserStateWithUpdatedRoleHistory(InitialUserActorSetup())
194 private[this] def createInitialUserState(event: InitializeUserState): Unit = {
196 // Should have been created from `createIMState()`
197 DEBUG("Cannot create user state from %s, since %s = %s", event, shortNameOfClass(classOf[IMStateSnapshot]), this._imState)
201 if(!this._imState.hasBeenCreated) {
202 // Cannot set the initial state!
203 DEBUG("Cannot create %s from %s, since user has not been created", shortNameOfType[UserState], event)
207 // We will also need this functionality when receiving IMEvents,
208 // so we place it in a method
209 loadUserStateAndUpdateRoleHistory()
212 DEBUG("Initial %s = %s", shortNameOfType[UserState], this._userState.toJsonString)
217 def onInitializeUserState(event: InitializeUserState): Unit = {
218 this._userID = event.userID
219 DEBUG("Got %s", event)
221 createInitialIMState()
222 createInitialUserState(event)
226 * Creates a new user state, taking into account the latest role history in IM state snapshot.
227 * Having an IM state snapshot is a prerequisite, otherwise you get an exception; so check before you
230 private[this] def newUserStateWithUpdatedRoleHistory(stateChangeReason: UserStateChangeReason): UserState = {
231 // FIXME: Also update agreement
232 this._userState.newWithRoleHistory(this._imState.roleHistory, stateChangeReason)
236 * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
237 * When this method is called, we assume that all proper checks have been made and it
238 * is OK to proceed with the event processing.
240 def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
241 val imEvent = processEvent.imEvent
244 // This is an error. Should have been initialized from somewhere ...
245 throw new AquariumInternalError("Got %s while uninitialized".format(processEvent))
248 if(this._imState.latestIMEvent.id == imEvent.id) {
249 // This happens when the actor is brought to life, then immediately initialized, and then
250 // sent the first IM event. But from the initialization procedure, this IM event will have
251 // already been loaded from DB!
252 INFO("Ignoring first %s just after %s birth", imEvent.toDebugString, shortClassNameOf(this))
258 val (creationTimeChanged,
259 activationTimeChanged,
260 roleChanged) = _updateIMStateRoleHistory(imEvent, this._imState.roleHistory.lastRoleName)
263 "(creationTimeChanged, activationTimeChanged, roleChanged)=(%s, %s, %s) using %s",
264 creationTimeChanged, activationTimeChanged, roleChanged,
268 // Must also update user state if we know when in history the life of a user begins
269 if(creationTimeChanged) {
271 loadUserStateAndUpdateRoleHistory()
272 INFO("Loaded %s due to %s", shortNameOfType[UserState], imEvent)
274 // Just update role history
275 this._userState = newUserStateWithUpdatedRoleHistory(IMEventArrival(imEvent))
276 INFO("Updated %s due to %s", shortNameOfType[UserState], imEvent)
280 DEBUG("Updated %s = %s", shortNameOfType[IMStateSnapshot], this._imState.toJsonString)
284 def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
285 val rcEvent = event.rcEvent
287 if(!shouldProcessResourceEvents) {
288 // This means the user has not been created (at least, as far as Aquarium is concerned).
289 // So, we do not process any resource event
290 DEBUG("Not processing %s", rcEvent.toJsonString)
296 // Since the latest resource event per resource is recorded in the user state,
297 // we do not need to query the store. Just query the in-memory state.
298 // Note: This is a similar situation with the first IMEvent received right after the user
300 if(this._userState.isLatestResourceEventIDEqualTo(rcEvent.id)) {
301 INFO("Ignoring first %s just after %s birth", rcEvent.toDebugString, shortClassNameOf(this))
307 val now = TimeHelpers.nowMillis()
308 val dt = now - this._latestResourceEventOccurredMillis
309 val belowThreshold = dt <= _timestampTheshold
312 this._latestResourceEventOccurredMillis = event.rcEvent.occurredMillis
314 DEBUG("Below threshold (%s ms). Not processing %s", this._timestampTheshold, rcEvent.toJsonString)
320 val userID = this._userID
321 val userCreationMillis = this._imState.userCreationMillis.get
322 val initialRole = this._imState.roleHistory.firstRoleName.getOrElse(aquarium.defaultInitialUserRole)
323 val initialAgreement = aquarium.initialAgreementForRole(initialRole, userCreationMillis)
324 val initialCredits = aquarium.initialBalanceForRole(initialRole, userCreationMillis)
325 val userStateBootstrap = UserStateBootstrap(
332 val billingMonthInfo = BillingMonthInfo.fromMillis(now)
333 val currentResourcesMap = aquarium.currentResourcesMap
334 val calculationReason = RealtimeBillingCalculation(None, now)
335 val eventOccurredMillis = rcEvent.occurredMillis
337 // DEBUG("Using %s", currentResourcesMap.toJsonString)
339 this._userState = aquarium.userStateComputations.doMonthBillingUpTo(
341 // Take into account that the event may be out-of-sync.
342 // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
343 now max eventOccurredMillis,
347 stdUserStateStoreFunc
350 this._latestResourceEventOccurredMillis = event.rcEvent.occurredMillis
352 DEBUG("Updated %s = %s", shortClassNameOf(this._userState), this._userState.toJsonString)
356 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
357 val userID = event.userID
359 (haveIMState, haveUserState) match {
361 // (have IMState, have UserState)
362 this._imState.hasBeenActivated match {
364 // (have IMState, activated, have UserState)
365 self reply GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits)))
368 // (have IMState, not activated, have UserState)
369 // Since we have user state, we should have been activated
370 self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
374 // (have IMState, no UserState)
375 this._imState.hasBeenActivated match {
377 // (have IMState, activated, no UserState)
378 // Since we are activated, we should have some state.
379 self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0002]"), 500)
381 // (have IMState, not activated, no UserState)
382 // The user is virtually unknown
383 self reply GetUserBalanceResponse(Left("User %s not activated [AQU-BAL-0003]".format(userID)), 404 /*Not found*/)
387 // (no IMState, have UserState)
388 // A bit ridiculous situation
389 self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
391 case (false, false) ⇒
392 // (no IMState, no UserState)
393 self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0005]".format(userID)), 404/*Not found*/)
397 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
398 haveUserState match {
400 self reply GetUserStateResponse(Right(this._userState))
403 self reply GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)))
407 private[this] def D_userID = {
411 private[this] def DEBUG(fmt: String, args: Any*) =
412 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
414 private[this] def INFO(fmt: String, args: Any*) =
415 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
417 private[this] def WARN(fmt: String, args: Any*) =
418 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
420 private[this] def ERROR(fmt: String, args: Any*) =
421 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
423 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
424 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)