2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.actor._
42 import akka.config.Supervision.Temporary
43 import gr.grnet.aquarium.util.{shortClassNameOf, shortNameOfClass, shortNameOfType}
44 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
45 import gr.grnet.aquarium.computation.data.IMStateSnapshot
46 import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
47 import gr.grnet.aquarium.computation.{BillingMonthInfo, UserStateBootstrappingData, UserState}
48 import gr.grnet.aquarium.util.date.TimeHelpers
49 import gr.grnet.aquarium.event.model.im.IMEventModel
50 import gr.grnet.aquarium.{AquariumException, Aquarium}
51 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
52 import gr.grnet.aquarium.computation.reason.{InitialUserActorSetup, UserStateChangeReason, IMEventArrival, InitialUserStateSetup}
56 * @author Christos KK Loverdos <loverdos@gmail.com>
59 class UserActor extends ReflectiveRoleableActor {
60 private[this] var _userID: String = "<?>"
61 private[this] var _imState: IMStateSnapshot = _
62 private[this] var _userState: UserState = _
64 self.lifeCycle = Temporary
66 private[this] def _shutmedown(): Unit = {
68 UserActorCache.invalidate(_userID)
74 override protected def onThrowable(t: Throwable, message: AnyRef) = {
76 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
81 def role = UserActorRole
83 private[this] def aquarium: Aquarium = Aquarium.Instance
84 private[this] def userStateComputations = aquarium.userStateComputations
86 private[this] def _timestampTheshold = {
87 aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000)
90 private[this] def haveUserState = {
91 this._userState ne null
94 private[this] def haveIMState = {
98 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
101 def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
104 private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel): (Boolean, Boolean, String) = {
106 val currentRole = this._imState.roleHistory.lastRole.map(_.name).getOrElse(null)
107 // logger.debug("Current role = %s".format(currentRole))
109 if(imEvent.role != currentRole) {
110 // logger.debug("New role = %s".format(imEvent.role))
111 this._imState = this._imState.updateRoleHistoryWithEvent(imEvent)
114 val noUpdateReason = "Same role '%s'".format(currentRole)
115 // logger.debug(noUpdateReason)
116 (false, false, noUpdateReason)
119 this._imState = IMStateSnapshot.initial(imEvent)
125 * Creates the IMStateSnapshot and returns the number of updates it made to it.
127 private[this] def createIMState(event: InitializeUserState): Int = {
128 val userID = event.userID
129 val store = aquarium.imEventStore
133 store.replayIMEventsInOccurrenceOrder(userID) { imEvent ⇒
134 DEBUG("Replaying %s", imEvent)
136 val (updated, firstUpdate, noUpdateReason) = _updateIMStateRoleHistory(imEvent)
138 _updateCount = _updateCount + 1
139 DEBUG("Updated %s for role '%s'", shortNameOfType[IMStateSnapshot], imEvent.role)
141 DEBUG("Not updated %s due to: %s", shortNameOfType[IMStateSnapshot], noUpdateReason)
146 DEBUG("Computed %s = %s", shortNameOfType[IMStateSnapshot], this._imState)
148 DEBUG("Not computed %s", shortNameOfType[IMStateSnapshot])
154 * Resource events are processed only if the user has been activated.
156 private[this] def shouldProcessResourceEvents: Boolean = {
157 haveIMState && this._imState.hasBeenActivated
160 private[this] def createUserState(event: InitializeUserState): Unit = {
162 // Should have been created from `createIMState()`
163 DEBUG("Cannot create user state from %s, since %s = %s", event, shortNameOfClass(classOf[IMStateSnapshot]), this._imState)
167 if(!this._imState.hasBeenActivated) {
168 // Cannot set the initial state!
169 DEBUG("Cannot create %s from %s, since user is inactive", shortNameOfType[UserState], event)
173 val userActivationMillis = this._imState.userActivationMillis.get
174 val initialRole = this._imState.roleHistory.firstRole.get.name
176 val userStateBootstrap = UserStateBootstrappingData(
178 userActivationMillis,
180 aquarium.initialAgreementForRole(initialRole, userActivationMillis),
181 aquarium.initialBalanceForRole(initialRole, userActivationMillis)
184 val userState = userStateComputations.doFullMonthlyBilling(
186 BillingMonthInfo.fromMillis(TimeHelpers.nowMillis()),
187 aquarium.currentResourcesMap,
188 InitialUserStateSetup,
192 this._userState = userState
194 // Final touch: Update role history
195 if(haveIMState && haveUserState) {
196 // FIXME: Not satisfied with this redundant info
197 if(this._userState.roleHistory != this._imState.roleHistory) {
198 this._userState = newUserStateWithUpdatedRoleHistory(InitialUserActorSetup)
203 DEBUG("%s = %s", shortNameOfType[UserState], this._userState)
207 def onInitializeUserState(event: InitializeUserState): Unit = {
208 val userID = event.userID
209 this._userID = userID
210 DEBUG("Got %s", event)
213 createUserState(event)
217 * Creates a new user state, taking into account the latest role history in IM state snapshot.
218 * Having an IM state snapshot is a prerequisite, otherwise you get an exception; so check before you
221 private[this] def newUserStateWithUpdatedRoleHistory(stateChangeReason: UserStateChangeReason): UserState = {
222 this._userState.copy(
223 roleHistory = this._imState.roleHistory,
224 // FIXME: Also update agreement
225 stateChangeCounter = this._userState.stateChangeCounter + 1,
226 lastChangeReason = stateChangeReason
231 * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
232 * When this method is called, we assume that all proper checks have been made and it
233 * is OK to proceed with the event processing.
235 def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
236 val imEvent = processEvent.imEvent
239 // This is an error. Should have been initialized from somewhere ...
240 throw new AquariumException("Got %s while being uninitialized".format(processEvent))
243 if(this._imState.latestIMEvent.id == imEvent.id) {
244 // This happens when the actor is brought to life, then immediately initialized, and then
245 // sent the first IM event. But from the initialization procedure, this IM event will have
246 // already been loaded from DB!
247 INFO("Ignoring first %s just after %s birth", imEvent.toDebugString, shortClassNameOf(this))
252 val (updated, firstUpdate, noUpdateReason) = _updateIMStateRoleHistory(imEvent)
255 DEBUG("Updated %s = %s", shortClassNameOf(this._imState), this._imState)
257 // Must also update user state
258 if(shouldProcessResourceEvents) {
260 DEBUG("Also updating %s with new %s",
261 shortClassNameOf(this._userState),
262 shortClassNameOf(this._imState.roleHistory)
265 this._userState = newUserStateWithUpdatedRoleHistory(IMEventArrival(imEvent))
269 DEBUG("Not updating %s from %s due to: %s", shortNameOfType[IMStateSnapshot], imEvent, noUpdateReason)
275 def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
276 val rcEvent = event.rcEvent
278 if(!shouldProcessResourceEvents) {
279 // This means the user has not been activated. So, we do not process any resource event
280 DEBUG("Not processing %s", rcEvent.toJsonString)
287 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
288 val userID = event.userID
290 (haveIMState, haveUserState) match {
292 // (have IMState, have UserState)
293 this._imState.hasBeenActivated match {
295 // (have IMState, activated, have UserState)
296 self reply GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits)))
299 // (have IMState, not activated, have UserState)
300 // Since we have user state, we should have been activated
301 self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
305 // (have IMState, no UserState)
306 this._imState.hasBeenActivated match {
308 // (have IMState, activated, no UserState)
309 // Since we are activated, we should have some state.
310 self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0002]"), 500)
312 // (have IMState, not activated, no UserState)
313 // The user is virtually unknown
314 self reply GetUserBalanceResponse(Left("User %s not activated [AQU-BAL-0003]".format(userID)), 404 /*Not found*/)
318 // (no IMState, have UserState
319 // A bit ridiculous situation
320 self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
322 case (false, false) ⇒
323 // (no IMState, no UserState)
324 self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0005]".format(userID)), 404/*Not found*/)
328 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
329 haveUserState match {
331 self reply GetUserStateResponse(Right(this._userState))
334 self reply GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)))
338 private[this] def D_userID = {
342 private[this] def DEBUG(fmt: String, args: Any*) =
343 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
345 private[this] def INFO(fmt: String, args: Any*) =
346 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
348 private[this] def WARN(fmt: String, args: Any*) =
349 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
351 private[this] def ERROR(fmt: String, args: Any*) =
352 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
354 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
355 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)