2 * Copyright 2011 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.user.actor
38 import gr.grnet.aquarium.actor._
39 import gr.grnet.aquarium.Configurator
40 import gr.grnet.aquarium.processor.actor._
41 import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting}
42 import gr.grnet.aquarium.user._
43 import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent}
45 import gr.grnet.aquarium.util.{DateUtils, Loggable}
46 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResource, DSLComplexResource}
47 import gr.grnet.aquarium.util.date.TimeHelpers
48 import com.ckkloverdos.maybe.{Maybe, Failed, NoVal, Just}
53 * @author Christos KK Loverdos <loverdos@gmail.com>
56 class UserActor extends AquariumActor
57 with Loggable with Accounting with DateUtils {
59 private[this] var _userId: String = _
61 private[this] var _userState: UserState = _
63 private[this] var _timestampTheshold: Long = _
65 def role = UserActorRole
67 private[this] def _configurator: Configurator = Configurator.MasterConfigurator
69 private[this] def processCreateUser(event: UserEvent): Unit = {
70 val userId = event.userId
71 DEBUG("Creating user from state %s", event)
72 val usersDB = _configurator.storeProvider.userStateStore
73 usersDB.findUserStateByUserId(userId) match {
74 case Just(userState) ⇒
75 WARN("User already created, state = %s".format(userState))
76 case failed @ Failed(e, m) ⇒
77 ERROR("[%s][%s] %s", e.getClass.getName, e.getMessage, m)
79 // OK. Create a default UserState and store it
80 val now = TimeHelpers.nowMillis
81 val agreementOpt = Policy.policy.findAgreement(DSLAgreement.DefaultAgreementName)
83 if(agreementOpt.isEmpty) {
84 ERROR("No default agreement found. Cannot initialize user state")
86 this._userState = DefaultUserStateComputations.createFirstUserState(
89 DSLAgreement.DefaultAgreementName)
91 DEBUG("Created and stored %s", this._userState)
96 private[this] def findRelatedEntries(res: DSLResource, instid: String): List[WalletEntry] = {
97 val walletDB = _configurator.storeProvider.walletEntryStore
98 walletDB.findPreviousEntry(_userId, res.name, instid, Some(false))
102 private[this] def processModifyUser(event: UserEvent): Unit = {
103 val now = TimeHelpers.nowMillis
104 val newActive = ActiveStateSnapshot(event.isStateActive, now)
106 DEBUG("New active status = %s".format(newActive))
108 this._userState = this._userState.copy( activeStateSnapshot = newActive )
111 * Use the provided [[gr.grnet.aquarium.logic.events.UserEvent]] to change any user state.
113 private[this] def processUserEvent(event: UserEvent): Unit = {
114 if(event.isCreateUser) {
115 processCreateUser(event)
116 } else if(event.isModifyUser) {
117 processModifyUser(event)
122 * Tries to makes sure that the internal user state exists.
124 * May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that.
127 private[this] def ensureUserState(): Unit = {
128 if (_userState == null)
131 rebuildState(_userState.oldestSnapshotTime, System.currentTimeMillis())
135 * Replay the event log for all events that affect the user state, starting
136 * from the provided time instant.
138 def rebuildState(from: Long): Unit =
139 rebuildState(from, oneYearAhead(new Date(), new Date(Long.MaxValue)).getTime)
142 * Replay the event log for all events that affect the user state.
144 def rebuildState(from: Long, to: Long): Unit = {
145 val start = System.currentTimeMillis()
146 if (_userState == null)
149 //Rebuild state from user events
150 val usersDB = _configurator.storeProvider.userEventStore
151 val userEvents = usersDB.findUserEventsByUserId(_userId)
152 val numUserEvents = userEvents.size
153 _userState = replayUserEvents(_userState, userEvents, from, to)
155 //Rebuild state from resource events
156 val eventsDB = _configurator.storeProvider.resourceEventStore
157 val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from)
158 val numResourceEvents = resourceEvents.size
159 // _userState = replayResourceEvents(_userState, resourceEvents, from, to)
161 //Rebuild state from wallet entries
162 val wallet = _configurator.storeProvider.walletEntryStore
163 val walletEnties = wallet.findWalletEntriesAfter(_userId, new Date(from))
164 val numWalletEntries = walletEnties.size
165 _userState = replayWalletEntries(_userState, walletEnties, from, to)
167 INFO(("Rebuilt state from %d events (%d user events, " +
168 "%d resource events, %d wallet entries) in %d msec").format(
169 numUserEvents + numResourceEvents + numWalletEntries,
170 numUserEvents, numResourceEvents, numWalletEntries,
171 System.currentTimeMillis() - start))
175 * Create an empty state for a user
177 def createBlankState = {
178 this._userState = DefaultUserStateComputations.createFirstUserState(this._userId)
182 * Replay user events on the provided user state
184 def replayUserEvents(initState: UserState, events: List[UserEvent],
185 from: Long, to: Long): UserState = {
186 // var act = initState.active
187 // var rol = initState.roles
189 // .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
193 // data = e.isStateActive, snapshotTime = e.occurredMillis)
194 // // TODO: Implement the following
195 // //_userState.agreement = _userState.agreement.copy(
196 // // data = e.newAgreement, e.occurredMillis)
198 // rol = rol.copy(data = e.roles,
199 // snapshotTime = e.occurredMillis)
201 // initState.copy(active = act, roles = rol)
207 * Replay wallet entries on the provided user state
209 def replayWalletEntries(initState: UserState, events: List[WalletEntry],
210 from: Long, to: Long): UserState = {
211 // var cred = initState.credits
213 // .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
216 // val newVal = cred.creditAmount + w.value
217 // cred = cred.copy(data = newVal)
219 // if (!events.isEmpty) {
220 // val snapTime = events.map{e => e.occurredMillis}.max
221 // cred = cred.copy(snapshotTime = snapTime)
223 // initState.copy(credits = cred)
228 * Persist current user state
230 private[this] def saveUserState(): Unit = {
231 _configurator.storeProvider.userStateStore.deleteUserState(this._userId)
232 _configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
233 case Just(record) => record
234 case NoVal => ERROR("Unknown error saving state")
236 ERROR("Saving state failed: %s error was: %s".format(a,e));
240 protected def receive: Receive = {
241 case m @ AquariumPropertiesLoaded(props) ⇒
242 this._timestampTheshold = props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
243 INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
245 case m @ UserActorInitWithUserId(userId) ⇒
246 this._userId = userId
247 DEBUG("Actor starting, loading state")
250 case m @ ProcessResourceEvent(resourceEvent) ⇒
251 if(resourceEvent.userId != this._userId) {
252 ERROR("Received %s but my userId = %s".format(m, this._userId))
255 // calcWalletEntries()
256 //processResourceEvent(resourceEvent, true)
259 case m @ ProcessUserEvent(userEvent) ⇒
260 if(userEvent.userId != this._userId) {
261 ERROR("Received %s but my userId = %s".format(m, this._userId))
264 processUserEvent(userEvent)
267 case m @ RequestUserBalance(userId, timestamp) ⇒
268 if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
270 // calcWalletEntries()
272 self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount)
274 case m @ UserRequestGetState(userId, timestamp) ⇒
275 if(this._userId != userId) {
276 ERROR("Received %s but my userId = %s".format(m, this._userId))
277 // TODO: throw an exception here
280 ERROR("FIXME: Should have properly computed the user state")
282 self reply UserResponseGetState(userId, this._userState)
286 override def postStop {
287 DEBUG("Stopping, saving state")
291 private[this] def DEBUG(fmt: String, args: Any*) =
292 logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
294 private[this] def INFO(fmt: String, args: Any*) =
295 logger.info("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
297 private[this] def WARN(fmt: String, args: Any*) =
298 logger.warn("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
300 private[this] def ERROR(fmt: String, args: Any*) =
301 logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))