Aquarium internal error
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.actor._
41 import gr.grnet.aquarium.user._
42
43 import gr.grnet.aquarium.util.shortClassNameOf
44 import gr.grnet.aquarium.util.date.TimeHelpers
45 import gr.grnet.aquarium.actor.message.service.router._
46 import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
47 import gr.grnet.aquarium.event.im.IMEventModel
48 import akka.config.Supervision.Temporary
49 import gr.grnet.aquarium.{AquariumInternalError, AquariumException, Configurator}
50
51
52 /**
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  */
56
57 class UserActor extends ReflectiveRoleableActor {
58   private[this] var _userID: String = _
59   private[this] var _userState: UserState = _
60
61   self.lifeCycle = Temporary
62
63   private[this] def _shutmedown(): Unit = {
64     if(_haveFullState) {
65       UserActorCache.invalidate(this._userID)
66     }
67
68     self.stop()
69   }
70
71   override protected def onThrowable(t: Throwable, message: AnyRef) = {
72     logChainOfCauses(t)
73     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
74
75     _shutmedown()
76   }
77
78   def role = UserActorRole
79
80   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
81 //  private[this] def _userId = _userState.userId
82
83   private[this] def _timestampTheshold =
84     _configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
85
86
87   private[this] def _haveFullState = {
88     (this._userID ne null) && (this._userState ne null)
89   }
90
91   private[this] def _havePartialState = {
92     (this._userID ne null) && (this._userState eq null)
93   }
94
95
96   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
97   }
98
99   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
100   }
101
102   private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = {
103     // FIXME: Implement based on the role
104     "default"
105   }
106
107   private[this] def processCreateUser(imEvent: IMEventModel): Unit = {
108     this._userID = imEvent.userID
109
110     val store = _configurator.storeProvider.userStateStore
111     // try find user state. normally should ot exist
112     val latestUserStateOpt = store.findLatestUserStateByUserID(this._userID)
113     if(latestUserStateOpt.isDefined) {
114       logger.error("Got %s(%s, %s) but user already exists. Ingoring".format(
115         this._userID,
116         shortClassNameOf(imEvent),
117         imEvent.eventType))
118
119       return
120     }
121
122     val initialAgreementName = _getAgreementNameForNewUser(imEvent)
123     val newUserState    = DefaultUserStateComputations.createInitialUserState(
124       this._userID,
125       imEvent.occurredMillis,
126       imEvent.isActive,
127       0.0,
128       List(imEvent.role),
129       initialAgreementName)
130
131     this._userState = newUserState
132
133     // FIXME: If this fails, then the actor must be shut down.
134     store.insertUserState(newUserState)
135   }
136
137   private[this] def processModifyUser(imEvent: IMEventModel): Unit = {
138     val now = TimeHelpers.nowMillis()
139
140     if(!_haveFullState) {
141       ERROR("Got %s(%s) but have no state. Shutting down", shortClassNameOf(imEvent), imEvent.eventType)
142       _shutmedown()
143       return
144     }
145
146     this._userState = this._userState.modifyFromIMEvent(imEvent, now)
147   }
148
149   def onProcessSetUserID(event: ProcessSetUserID): Unit = {
150     this._userID = event.userID
151   }
152
153   def onProcessIMEvent(event: ProcessIMEvent): Unit = {
154     val now = TimeHelpers.nowMillis()
155
156     val imEvent = event.imEvent
157     // If we already have a userID but it does not match the incoming userID, then this is an internal error
158     if(_havePartialState && (this._userID != imEvent.userID)) {
159       throw new AquariumInternalError(
160         "Got userID = %s but already have userID = %s".format(imEvent.userID, this._userID))
161     }
162
163     // If we get an IMEvent without having a user state, then we query for the latest user state.
164     if(!_haveFullState) {
165       val userStateOpt = _configurator.userStateStore.findLatestUserStateByUserID(this._userID)
166       this._userState = userStateOpt match {
167         case Some(userState) ⇒
168           userState
169
170         case None ⇒
171           val initialAgreementName = _getAgreementNameForNewUser(imEvent)
172           val initialUserState = DefaultUserStateComputations.createInitialUserState(
173             this._userID,
174             imEvent.occurredMillis,
175             imEvent.isActive,
176             0.0,
177             List(imEvent.role),
178             initialAgreementName)
179
180           DEBUG("Got initial state")
181           initialUserState
182       }
183     }
184
185     if(imEvent.isModifyUser && this._userState.isInitial) {
186       INFO("Got a '%s' but have not received '%s' yet", imEvent.eventType, IMEventModel.EventTypeNames.create)
187       return
188     }
189
190     if(imEvent.isCreateUser && !this._userState.isInitial) {
191       INFO("Got a '%s' but my state is not initial", imEvent.eventType)
192       return
193     }
194
195     this._userState = this._userState.modifyFromIMEvent(imEvent, now)
196
197     if(imEvent.isCreateUser) {
198       processCreateUser(imEvent)
199     } else if(imEvent.isModifyUser) {
200       processModifyUser(imEvent)
201     } else {
202       throw new AquariumException("Cannot interpret %s".format(imEvent))
203     }
204   }
205
206   def onRequestUserBalance(event: RequestUserBalance): Unit = {
207     val userId = event.userID
208     // FIXME: Implement threshold
209     self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount)
210   }
211
212   def onUserRequestGetState(event: UserRequestGetState): Unit = {
213     val userId = event.userID
214    // FIXME: implement
215     self reply UserResponseGetState(userId, this._userState)
216   }
217
218   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
219   }
220
221
222   private[this] def D_userID = {
223     if(this._userID eq null)
224       "<NOT INITIALIZED>" // We always get a userID first
225     else
226       if(this._userState eq null)
227         "%s, NO STATE".format(this._userID)
228       else
229         "%s".format(this._userID)
230   }
231
232   private[this] def DEBUG(fmt: String, args: Any*) =
233     logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
234
235   private[this] def INFO(fmt: String, args: Any*) =
236     logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
237
238   private[this] def WARN(fmt: String, args: Any*) =
239     logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
240
241   private[this] def ERROR(fmt: String, args: Any*) =
242     logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
243
244   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
245       logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
246 }