774860713a629fa237068fcc8b8062fcf0b5942f
[aquarium] / src / main / scala / gr / grnet / aquarium / user / actor / UserActor.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.user.actor
37
38 import gr.grnet.aquarium.actor._
39 import gr.grnet.aquarium.Configurator
40 import gr.grnet.aquarium.processor.actor._
41 import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting}
42 import gr.grnet.aquarium.user._
43 import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent}
44 import java.util.Date
45 import gr.grnet.aquarium.util.{DateUtils, Loggable}
46 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResource, DSLComplexResource}
47 import gr.grnet.aquarium.util.date.TimeHelpers
48 import com.ckkloverdos.maybe.{Maybe, Failed, NoVal, Just}
49
50
51 /**
52  *
53  * @author Christos KK Loverdos <loverdos@gmail.com>
54  */
55
56 class UserActor extends AquariumActor
57   with Loggable with Accounting with DateUtils {
58   @volatile
59   private[this] var _userId: String = _
60   @volatile
61   private[this] var _userState: UserState = _
62   @volatile
63   private[this] var _timestampTheshold: Long = _
64
65   def role = UserActorRole
66
67   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
68
69   private[this] def processCreateUser(event: UserEvent): Unit = {
70     val userId = event.userId
71     DEBUG("Creating user from state %s", event)
72     val usersDB = _configurator.storeProvider.userStateStore
73     usersDB.findUserStateByUserId(userId) match {
74       case Just(userState) ⇒
75         WARN("User already created, state = %s".format(userState))
76       case failed @ Failed(e, m) ⇒
77         ERROR("[%s][%s] %s", e.getClass.getName, e.getMessage, m)
78       case NoVal ⇒
79         // OK. Create a default UserState and store it
80         val now = TimeHelpers.nowMillis
81         val agreementOpt = Policy.policy.findAgreement(DSLAgreement.DefaultAgreementName)
82
83         if(agreementOpt.isEmpty) {
84           ERROR("No default agreement found. Cannot initialize user state")
85         } else {
86           this._userState = DefaultUserStateComputations.createFirstUserState(
87             userId,
88             event.occurredMillis,
89             DSLAgreement.DefaultAgreementName)
90           saveUserState
91           DEBUG("Created and stored %s", this._userState)
92         }
93     }
94   }
95
96   private[this] def findRelatedEntries(res: DSLResource, instid: String): List[WalletEntry] = {
97     val walletDB = _configurator.storeProvider.walletEntryStore
98     walletDB.findPreviousEntry(_userId, res.name, instid, Some(false))
99   }
100
101
102   private[this] def processModifyUser(event: UserEvent): Unit = {
103     val now = TimeHelpers.nowMillis
104     val newActive = ActiveStateSnapshot(event.isStateActive, now)
105
106     DEBUG("New active status = %s".format(newActive))
107
108     this._userState = this._userState.copy( activeStateSnapshot = newActive )
109   }
110   /**
111    * Use the provided [[gr.grnet.aquarium.logic.events.UserEvent]] to change any user state.
112    */
113   private[this] def processUserEvent(event: UserEvent): Unit = {
114     if(event.isCreateUser) {
115       processCreateUser(event)
116     } else if(event.isModifyUser) {
117       processModifyUser(event)
118     }
119   }
120
121   /**
122    * Tries to makes sure that the internal user state exists.
123    *
124    * May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that.
125    *
126    */
127   private[this] def ensureUserState(): Unit = {
128     if (_userState == null)
129       rebuildState(0)
130     else
131       rebuildState(_userState.oldestSnapshotTime, System.currentTimeMillis())
132   }
133
134   /**
135    * Replay the event log for all events that affect the user state, starting
136    * from the provided time instant.
137    */
138   def rebuildState(from: Long): Unit =
139     rebuildState(from, oneYearAhead(new Date(), new Date(Long.MaxValue)).getTime)
140
141   /**
142    * Replay the event log for all events that affect the user state.
143    */
144   def rebuildState(from: Long, to: Long): Unit = {
145     val start = System.currentTimeMillis()
146     if (_userState == null)
147       createBlankState
148
149     //Rebuild state from user events
150     val usersDB = _configurator.storeProvider.userEventStore
151     val userEvents = usersDB.findUserEventsByUserId(_userId)
152     val numUserEvents = userEvents.size
153     _userState = replayUserEvents(_userState, userEvents, from, to)
154
155     //Rebuild state from resource events
156     val eventsDB = _configurator.storeProvider.resourceEventStore
157     val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from)
158     val numResourceEvents = resourceEvents.size
159 //    _userState = replayResourceEvents(_userState, resourceEvents, from, to)
160
161     //Rebuild state from wallet entries
162     val wallet = _configurator.storeProvider.walletEntryStore
163     val walletEnties = wallet.findWalletEntriesAfter(_userId, new Date(from))
164     val numWalletEntries = walletEnties.size
165     _userState = replayWalletEntries(_userState, walletEnties, from, to)
166
167     INFO(("Rebuilt state from %d events (%d user events, " +
168       "%d resource events, %d wallet entries) in %d msec").format(
169       numUserEvents + numResourceEvents + numWalletEntries,
170       numUserEvents, numResourceEvents, numWalletEntries,
171       System.currentTimeMillis() - start))
172   }
173
174   /**
175    * Create an empty state for a user
176    */
177   def createBlankState = {
178     this._userState = DefaultUserStateComputations.createFirstUserState(this._userId)
179   }
180
181   /**
182    * Replay user events on the provided user state
183    */
184   def replayUserEvents(initState: UserState, events: List[UserEvent],
185                        from: Long, to: Long): UserState = {
186 //    var act = initState.active
187 //    var rol = initState.roles
188 //    events
189 //      .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
190 //      .foreach {
191 //        e =>
192 //          act = act.copy(
193 //            data = e.isStateActive, snapshotTime = e.occurredMillis)
194 //          // TODO: Implement the following
195 //          //_userState.agreement = _userState.agreement.copy(
196 //          //  data = e.newAgreement, e.occurredMillis)
197 //
198 //          rol = rol.copy(data = e.roles,
199 //            snapshotTime = e.occurredMillis)
200 //    }
201 //    initState.copy(active = act, roles = rol)
202     initState
203   }
204
205
206   /**
207    * Replay wallet entries on the provided user state
208    */
209   def replayWalletEntries(initState: UserState, events: List[WalletEntry],
210                           from: Long, to: Long): UserState = {
211 //    var cred = initState.credits
212 //    events
213 //      .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
214 //      .foreach {
215 //        w =>
216 //          val newVal = cred.creditAmount + w.value
217 //          cred = cred.copy(data = newVal)
218 //    }
219 //    if (!events.isEmpty) {
220 //      val snapTime = events.map{e => e.occurredMillis}.max
221 //      cred = cred.copy(snapshotTime = snapTime)
222 //    }
223 //    initState.copy(credits = cred)
224     initState
225   }
226
227   /**
228    * Persist current user state
229    */
230   private[this] def saveUserState(): Unit = {
231     _configurator.storeProvider.userStateStore.deleteUserState(this._userId)
232     _configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
233       case Just(record) => record
234       case NoVal => ERROR("Unknown error saving state")
235       case Failed(e, a) =>
236         ERROR("Saving state failed: %s error was: %s".format(a,e));
237     }
238   }
239
240   protected def receive: Receive = {
241     case m @ AquariumPropertiesLoaded(props) ⇒
242       this._timestampTheshold = props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
243       INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
244
245     case m @ UserActorInitWithUserId(userId) ⇒
246       this._userId = userId
247       DEBUG("Actor starting, loading state")
248       ensureUserState()
249
250     case m @ ProcessResourceEvent(resourceEvent) ⇒
251       if(resourceEvent.userId != this._userId) {
252         ERROR("Received %s but my userId = %s".format(m, this._userId))
253       } else {
254         //ensureUserState()
255 //        calcWalletEntries()
256         //processResourceEvent(resourceEvent, true)
257       }
258
259     case m @ ProcessUserEvent(userEvent) ⇒
260       if(userEvent.userId != this._userId) {
261         ERROR("Received %s but my userId = %s".format(m, this._userId))
262       } else {
263         ensureUserState()
264         processUserEvent(userEvent)
265       }
266
267     case m @ RequestUserBalance(userId, timestamp) ⇒
268       if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
269       {
270 //        calcWalletEntries()
271       }
272       self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount)
273
274     case m @ UserRequestGetState(userId, timestamp) ⇒
275       if(this._userId != userId) {
276         ERROR("Received %s but my userId = %s".format(m, this._userId))
277         // TODO: throw an exception here
278       } else {
279         // FIXME: implement
280         ERROR("FIXME: Should have properly computed the user state")
281         ensureUserState()
282         self reply UserResponseGetState(userId, this._userState)
283       }
284   }
285
286   override def postStop {
287     DEBUG("Stopping, saving state")
288     //saveUserState
289   }
290
291   private[this] def DEBUG(fmt: String, args: Any*) =
292     logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
293
294   private[this] def INFO(fmt: String, args: Any*) =
295       logger.info("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
296
297   private[this] def WARN(fmt: String, args: Any*) =
298     logger.warn("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
299
300   private[this] def ERROR(fmt: String, args: Any*) =
301     logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
302 }