WIP: IMEventModel end-to-end chain
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import java.util.Date
41 import com.ckkloverdos.maybe.{Failed, NoVal, Just}
42
43 import gr.grnet.aquarium.actor._
44 import gr.grnet.aquarium.Configurator
45 import gr.grnet.aquarium.user._
46
47 import gr.grnet.aquarium.util.Loggable
48 import gr.grnet.aquarium.util.date.TimeHelpers
49 import gr.grnet.aquarium.logic.accounting.RoleAgreements
50 import gr.grnet.aquarium.messaging.AkkaAMQP
51 import gr.grnet.aquarium.actor.message.config.user.UserActorInitWithUserId
52 import gr.grnet.aquarium.actor.message.service.dispatcher._
53 import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
54 import gr.grnet.aquarium.event.im.IMEventModel
55 import gr.grnet.aquarium.event.{WalletEntry}
56
57
58 /**
59  *
60  * @author Christos KK Loverdos <loverdos@gmail.com>
61  */
62
63 class UserActor extends AquariumActor
64 with AkkaAMQP
65 with ReflectiveAquariumActor
66 with Loggable {
67   @volatile
68   private[this] var _userId: String = _
69   @volatile
70   private[this] var _userState: UserState = _
71   @volatile
72   private[this] var _timestampTheshold: Long = _
73
74   private[this] lazy val messenger = producer("aquarium") // FIXME: Read this from configuration
75
76   def role = UserActorRole
77
78   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
79
80   /**
81    * Replay the event log for all events that affect the user state.
82    */
83   def rebuildState(from: Long, to: Long): Unit = {
84     val start = TimeHelpers.nowMillis()
85     if(_userState == null)
86       createBlankState
87
88     //Rebuild state from user events
89     val usersDB = _configurator.storeProvider.imEventStore
90     val userEvents = usersDB.findIMEventsByUserId(_userId)
91     val numUserEvents = userEvents.size
92     _userState = replayIMEvents(_userState, userEvents, from, to)
93
94     //Rebuild state from resource events
95     val eventsDB = _configurator.storeProvider.resourceEventStore
96     val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from)
97     val numResourceEvents = resourceEvents.size
98     //    _userState = replayResourceEvents(_userState, resourceEvents, from, to)
99
100     //Rebuild state from wallet entries
101     val wallet = _configurator.storeProvider.walletEntryStore
102     val walletEnties = wallet.findWalletEntriesAfter(_userId, new Date(from))
103     val numWalletEntries = walletEnties.size
104     _userState = replayWalletEntries(_userState, walletEnties, from, to)
105
106     INFO(("Rebuilt state from %d events (%d user events, " +
107       "%d resource events, %d wallet entries) in %d msec").format(
108       numUserEvents + numResourceEvents + numWalletEntries,
109       numUserEvents, numResourceEvents, numWalletEntries,
110       TimeHelpers.nowMillis() - start))
111   }
112
113   /**
114    * Create an empty state for a user
115    */
116   def createBlankState = {
117     this._userState = DefaultUserStateComputations.createInitialUserState(this._userId, 0L, true, 0.0)
118   }
119
120   /**
121    * Replay user events on the provided user state
122    */
123   def replayIMEvents(initState: UserState, events: List[IMEventModel],
124                        from: Long, to: Long): UserState = {
125     initState
126   }
127
128
129   /**
130    * Replay wallet entries on the provided user state
131    */
132   def replayWalletEntries(initState: UserState, events: List[WalletEntry],
133                           from: Long, to: Long): UserState = {
134     initState
135   }
136
137   /**
138    * Persist current user state
139    */
140   private[this] def saveUserState(): Unit = {
141     _configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
142       case Just(record) => record
143       case NoVal => ERROR("Unknown error saving state")
144       case Failed(e) =>
145         ERROR("Saving state failed: %s".format(e));
146     }
147   }
148
149   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
150     this._timestampTheshold = event.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
151     INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
152   }
153
154   def onUserActorInitWithUserId(event: UserActorInitWithUserId): Unit = {
155     this._userId = event.userId
156     DEBUG("Actor starting, loading state")
157   }
158
159   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
160     val resourceEvent = event.rcEvent
161     if(resourceEvent.userID != this._userId) {
162       ERROR("Received %s but my userId = %s".format(event, this._userId))
163     } else {
164       //ensureUserState()
165       //        calcWalletEntries()
166       //processResourceEvent(resourceEvent, true)
167     }
168   }
169
170   private[this] def processCreateUser(event: IMEventModel): Unit = {
171     val userId = event.userID
172     DEBUG("Creating user from state %s", event)
173     val usersDB = _configurator.storeProvider.userStateStore
174     usersDB.findUserStateByUserId(userId) match {
175       case Just(userState) ⇒
176         WARN("User already created, state = %s".format(userState))
177       case failed@Failed(e) ⇒
178         ERROR("[%s] %s", e.getClass.getName, e.getMessage)
179       case NoVal ⇒
180         val agreement = RoleAgreements.agreementForRole(event.role)
181         DEBUG("User %s assigned agreement %s".format(userId, agreement.name))
182
183         this._userState = DefaultUserStateComputations.createInitialUserState(
184           userId,
185           event.occurredMillis,
186           event.isActive, 0.0, List(event.role), agreement.name)
187         saveUserState
188         DEBUG("Created and stored %s", this._userState)
189     }
190   }
191
192   private[this] def processModifyUser(event: IMEventModel): Unit = {
193     val now = TimeHelpers.nowMillis()
194     val newActive = ActiveStateSnapshot(event.isStateActive, now)
195
196     DEBUG("New active status = %s".format(newActive))
197
198     this._userState = this._userState.copy(activeStateSnapshot = newActive)
199   }
200
201   def onProcessIMEvent(event: ProcessIMEvent): Unit = {
202     val userEvent = event.imEvent
203     if(userEvent.userID != this._userId) {
204       ERROR("Received %s but my userId = %s".format(userEvent, this._userId))
205     } else {
206       if(userEvent.isCreateUser) {
207         processCreateUser(userEvent)
208       } else if(userEvent.isModifyUser) {
209         processModifyUser(userEvent)
210       }
211     }
212   }
213
214   def onRequestUserBalance(event: RequestUserBalance): Unit = {
215     val userId = event.userId
216     val timestamp = event.timestamp
217
218     if(TimeHelpers.nowMillis() - _userState.newestSnapshotTime > 60 * 1000) {
219       //        calcWalletEntries()
220     }
221     self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount)
222   }
223
224   def onUserRequestGetState(event: UserRequestGetState): Unit = {
225     val userId = event.userId
226     if(this._userId != userId) {
227       ERROR("Received %s but my userId = %s".format(event, this._userId))
228       // TODO: throw an exception here
229     } else {
230       // FIXME: implement
231       ERROR("FIXME: Should have properly computed the user state")
232       //      ensureUserState()
233       self reply UserResponseGetState(userId, this._userState)
234     }
235   }
236
237   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
238   }
239
240   override def postStop {
241     DEBUG("Actor[%s] stopping, saving state", self.uuid)
242     saveUserState
243   }
244
245   override def preRestart(reason: Throwable) {
246     ERROR(reason, "preRestart: Actor[%s]", self.uuid)
247   }
248
249   override def postRestart(reason: Throwable) {
250     ERROR(reason, "postRestart: Actor[%s]", self.uuid)
251   }
252
253   private[this] def DEBUG(fmt: String, args: Any*) =
254     logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args: _*)))
255
256   private[this] def INFO(fmt: String, args: Any*) =
257     logger.info("UserActor[%s]: %s".format(_userId, fmt.format(args: _*)))
258
259   private[this] def WARN(fmt: String, args: Any*) =
260     logger.warn("UserActor[%s]: %s".format(_userId, fmt.format(args: _*)))
261
262   private[this] def ERROR(fmt: String, args: Any*) =
263     logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args: _*)))
264
265   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
266       logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args: _*)), t)
267 }