2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
41 import com.ckkloverdos.maybe.{Failed, NoVal, Just}
43 import gr.grnet.aquarium.actor._
44 import gr.grnet.aquarium.Configurator
45 import gr.grnet.aquarium.user._
47 import gr.grnet.aquarium.util.Loggable
48 import gr.grnet.aquarium.util.date.TimeHelpers
49 import gr.grnet.aquarium.logic.accounting.RoleAgreements
50 import gr.grnet.aquarium.messaging.AkkaAMQP
51 import gr.grnet.aquarium.actor.message.config.user.UserActorInitWithUserId
52 import gr.grnet.aquarium.actor.message.service.dispatcher._
53 import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
54 import gr.grnet.aquarium.event.im.IMEventModel
55 import gr.grnet.aquarium.event.{WalletEntry}
60 * @author Christos KK Loverdos <loverdos@gmail.com>
63 class UserActor extends AquariumActor
65 with ReflectiveAquariumActor
68 private[this] var _userId: String = _
70 private[this] var _userState: UserState = _
72 private[this] var _timestampTheshold: Long = _
74 private[this] lazy val messenger = producer("aquarium") // FIXME: Read this from configuration
76 def role = UserActorRole
78 private[this] def _configurator: Configurator = Configurator.MasterConfigurator
81 * Replay the event log for all events that affect the user state.
83 def rebuildState(from: Long, to: Long): Unit = {
84 val start = TimeHelpers.nowMillis()
85 if(_userState == null)
88 //Rebuild state from user events
89 val usersDB = _configurator.storeProvider.imEventStore
90 val userEvents = usersDB.findIMEventsByUserId(_userId)
91 val numUserEvents = userEvents.size
92 _userState = replayIMEvents(_userState, userEvents, from, to)
94 //Rebuild state from resource events
95 val eventsDB = _configurator.storeProvider.resourceEventStore
96 val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from)
97 val numResourceEvents = resourceEvents.size
98 // _userState = replayResourceEvents(_userState, resourceEvents, from, to)
100 //Rebuild state from wallet entries
101 val wallet = _configurator.storeProvider.walletEntryStore
102 val walletEnties = wallet.findWalletEntriesAfter(_userId, new Date(from))
103 val numWalletEntries = walletEnties.size
104 _userState = replayWalletEntries(_userState, walletEnties, from, to)
106 INFO(("Rebuilt state from %d events (%d user events, " +
107 "%d resource events, %d wallet entries) in %d msec").format(
108 numUserEvents + numResourceEvents + numWalletEntries,
109 numUserEvents, numResourceEvents, numWalletEntries,
110 TimeHelpers.nowMillis() - start))
114 * Create an empty state for a user
116 def createBlankState = {
117 this._userState = DefaultUserStateComputations.createInitialUserState(this._userId, 0L, true, 0.0)
121 * Replay user events on the provided user state
123 def replayIMEvents(initState: UserState, events: List[IMEventModel],
124 from: Long, to: Long): UserState = {
130 * Replay wallet entries on the provided user state
132 def replayWalletEntries(initState: UserState, events: List[WalletEntry],
133 from: Long, to: Long): UserState = {
138 * Persist current user state
140 private[this] def saveUserState(): Unit = {
141 _configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
142 case Just(record) => record
143 case NoVal => ERROR("Unknown error saving state")
145 ERROR("Saving state failed: %s".format(e));
149 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
150 this._timestampTheshold = event.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
151 INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
154 def onUserActorInitWithUserId(event: UserActorInitWithUserId): Unit = {
155 this._userId = event.userId
156 DEBUG("Actor starting, loading state")
159 def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
160 val resourceEvent = event.rcEvent
161 if(resourceEvent.userID != this._userId) {
162 ERROR("Received %s but my userId = %s".format(event, this._userId))
165 // calcWalletEntries()
166 //processResourceEvent(resourceEvent, true)
170 private[this] def processCreateUser(event: IMEventModel): Unit = {
171 val userId = event.userID
172 DEBUG("Creating user from state %s", event)
173 val usersDB = _configurator.storeProvider.userStateStore
174 usersDB.findUserStateByUserId(userId) match {
175 case Just(userState) ⇒
176 WARN("User already created, state = %s".format(userState))
177 case failed@Failed(e) ⇒
178 ERROR("[%s] %s", e.getClass.getName, e.getMessage)
180 val agreement = RoleAgreements.agreementForRole(event.role)
181 DEBUG("User %s assigned agreement %s".format(userId, agreement.name))
183 this._userState = DefaultUserStateComputations.createInitialUserState(
185 event.occurredMillis,
186 event.isActive, 0.0, List(event.role), agreement.name)
188 DEBUG("Created and stored %s", this._userState)
192 private[this] def processModifyUser(event: IMEventModel): Unit = {
193 val now = TimeHelpers.nowMillis()
194 val newActive = ActiveStateSnapshot(event.isStateActive, now)
196 DEBUG("New active status = %s".format(newActive))
198 this._userState = this._userState.copy(activeStateSnapshot = newActive)
201 def onProcessIMEvent(event: ProcessIMEvent): Unit = {
202 val userEvent = event.imEvent
203 if(userEvent.userID != this._userId) {
204 ERROR("Received %s but my userId = %s".format(userEvent, this._userId))
206 if(userEvent.isCreateUser) {
207 processCreateUser(userEvent)
208 } else if(userEvent.isModifyUser) {
209 processModifyUser(userEvent)
214 def onRequestUserBalance(event: RequestUserBalance): Unit = {
215 val userId = event.userId
216 val timestamp = event.timestamp
218 if(TimeHelpers.nowMillis() - _userState.newestSnapshotTime > 60 * 1000) {
219 // calcWalletEntries()
221 self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount)
224 def onUserRequestGetState(event: UserRequestGetState): Unit = {
225 val userId = event.userId
226 if(this._userId != userId) {
227 ERROR("Received %s but my userId = %s".format(event, this._userId))
228 // TODO: throw an exception here
231 ERROR("FIXME: Should have properly computed the user state")
233 self reply UserResponseGetState(userId, this._userState)
237 def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
240 override def postStop {
241 DEBUG("Actor[%s] stopping, saving state", self.uuid)
245 override def preRestart(reason: Throwable) {
246 ERROR(reason, "preRestart: Actor[%s]", self.uuid)
249 override def postRestart(reason: Throwable) {
250 ERROR(reason, "postRestart: Actor[%s]", self.uuid)
253 private[this] def DEBUG(fmt: String, args: Any*) =
254 logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args: _*)))
256 private[this] def INFO(fmt: String, args: Any*) =
257 logger.info("UserActor[%s]: %s".format(_userId, fmt.format(args: _*)))
259 private[this] def WARN(fmt: String, args: Any*) =
260 logger.warn("UserActor[%s]: %s".format(_userId, fmt.format(args: _*)))
262 private[this] def ERROR(fmt: String, args: Any*) =
263 logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args: _*)))
265 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
266 logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args: _*)), t)