WIP: IMEventModel end-to-end chain
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.actor._
41 import gr.grnet.aquarium.user._
42
43 import gr.grnet.aquarium.util.shortClassNameOf
44 import gr.grnet.aquarium.util.chainOfCauses
45 import gr.grnet.aquarium.util.date.TimeHelpers
46 import gr.grnet.aquarium.actor.message.service.router._
47 import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
48 import gr.grnet.aquarium.event.im.IMEventModel
49 import akka.config.Supervision.Temporary
50 import gr.grnet.aquarium.{AquariumException, Configurator}
51
52
53 /**
54  *
55  * @author Christos KK Loverdos <loverdos@gmail.com>
56  */
57
58 class UserActor extends ReflectiveRoleableActor {
59   private[this] var _userID: String = _
60   private[this] var _userState: UserState = _
61
62   self.lifeCycle = Temporary
63
64   private[this] def _shutmedown(): Unit = {
65     if(_haveFullState) {
66       UserActorCache.invalidate(this._userID)
67     }
68
69     self.stop()
70   }
71
72   override protected def onThrowable(t: Throwable, message: AnyRef) = {
73     logChainOfCauses(t)
74     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
75
76     _shutmedown()
77   }
78
79   def role = UserActorRole
80
81   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
82 //  private[this] def _userId = _userState.userId
83
84   private[this] def _timestampTheshold =
85     _configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
86
87
88   private[this] def _haveFullState = {
89     (this._userID ne null) && (this._userState ne null)
90   }
91
92   private[this] def _havePartialState = {
93     (this._userID ne null) && (this._userState eq null)
94   }
95
96
97   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
98   }
99
100   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
101   }
102
103   private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = {
104     // FIXME: Implement based on the role
105     "default"
106   }
107
108   private[this] def processCreateUser(imEvent: IMEventModel): Unit = {
109     this._userID = imEvent.userID
110
111     val store = _configurator.storeProvider.userStateStore
112     // try find user state. normally should ot exist
113     val latestUserStateOpt = store.findLatestUserStateByUserID(this._userID)
114     if(latestUserStateOpt.isDefined) {
115       logger.error("Got %s(%s, %s) but user already exists. Ingoring".format(
116         this._userID,
117         shortClassNameOf(imEvent),
118         imEvent.eventType))
119
120       return
121     }
122
123     val initialAgreementName = _getAgreementNameForNewUser(imEvent)
124     val newUserState    = DefaultUserStateComputations.createInitialUserState(
125       this._userID,
126       imEvent.occurredMillis,
127       imEvent.isActive,
128       0.0,
129       List(imEvent.role),
130       initialAgreementName)
131
132     this._userState = newUserState
133
134     // FIXME: If this fails, then the actor must be shut down.
135     store.insertUserState(newUserState)
136   }
137
138   private[this] def processModifyUser(imEvent: IMEventModel): Unit = {
139     val now = TimeHelpers.nowMillis()
140
141     if(!_haveFullState) {
142       ERROR("Got %s(%s) but have no state. Shutting down", shortClassNameOf(imEvent), imEvent.eventType)
143       _shutmedown()
144       return
145     }
146
147     this._userState = this._userState.modifyFromIMEvent(imEvent, now)
148   }
149
150   def onProcessSetUserID(event: ProcessSetUserID): Unit = {
151     this._userID = event.userID
152   }
153
154   def onProcessIMEvent(event: ProcessIMEvent): Unit = {
155     val now = TimeHelpers.nowMillis()
156
157     val imEvent = event.imEvent
158     // If we already have a userID but it does not match the incoming userID, then this is an internal error
159     if(_havePartialState && (this._userID != imEvent.userID)) {
160       throw new AquariumException("Got userID = %s but already have userID = %s".format(imEvent.userID, this._userID))
161     }
162
163     // If we get an IMEvent without having a user state, then we query for the latest user state.
164     if(!_haveFullState) {
165       val userStateOpt = _configurator.userStateStore.findLatestUserStateByUserID(this._userID)
166       this._userState = userStateOpt match {
167         case Some(userState) ⇒
168           userState
169
170         case None ⇒
171           val initialAgreementName = _getAgreementNameForNewUser(imEvent)
172           val initialUserState = DefaultUserStateComputations.createInitialUserState(
173             this._userID,
174             imEvent.occurredMillis,
175             imEvent.isActive,
176             0.0,
177             List(imEvent.role),
178             initialAgreementName)
179
180           DEBUG("Got initial state")
181           initialUserState
182       }
183     }
184
185     if(imEvent.isModifyUser && this._userState.isInitial) {
186       INFO("Got a '%s' but have not received '%s' yet", imEvent.eventType, IMEventModel.EventTypeNames.create)
187       return
188     }
189
190     if(imEvent.isCreateUser && !this._userState.isInitial) {
191       INFO("Got a '%s' but my state is not initial", imEvent.eventType)
192       return
193     }
194
195     this._userState = this._userState.modifyFromIMEvent(imEvent, now)
196
197     if(imEvent.isCreateUser) {
198       processCreateUser(imEvent)
199     } else if(imEvent.isModifyUser) {
200       processModifyUser(imEvent)
201     } else {
202       throw new AquariumException("Cannot interpret %s".format(imEvent))
203     }
204   }
205
206   def onRequestUserBalance(event: RequestUserBalance): Unit = {
207     val userId = event.userID
208     // FIXME: Implement threshold
209     self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount)
210   }
211
212   def onUserRequestGetState(event: UserRequestGetState): Unit = {
213     val userId = event.userID
214    // FIXME: implement
215     self reply UserResponseGetState(userId, this._userState)
216   }
217
218   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
219   }
220
221
222   private[this] def D_userID = {
223     if(this._userID eq null)
224       "<NOT INITIALIZED>" // We always get a userID first
225     else
226       if(this._userState eq null)
227         "%s, NO STATE".format(this._userID)
228       else
229         "%s".format(this._userID)
230   }
231
232   private[this] def DEBUG(fmt: String, args: Any*) =
233     logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
234
235   private[this] def INFO(fmt: String, args: Any*) =
236     logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
237
238   private[this] def WARN(fmt: String, args: Any*) =
239     logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
240
241   private[this] def ERROR(fmt: String, args: Any*) =
242     logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
243
244   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
245       logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
246 }