fd1f19eb46726a766ee94b971b2293e29b32b48c
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.actor._
41 import gr.grnet.aquarium.user._
42
43 import gr.grnet.aquarium.util.shortClassNameOf
44 import gr.grnet.aquarium.util.chainOfCauses
45 import gr.grnet.aquarium.util.date.TimeHelpers
46 import gr.grnet.aquarium.actor.message.service.router._
47 import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
48 import gr.grnet.aquarium.event.im.IMEventModel
49 import akka.config.Supervision.Temporary
50 import akka.actor.PoisonPill
51 import gr.grnet.aquarium.{AquariumException, Configurator}
52 import com.ckkloverdos.convert.ConverterException
53
54
55 /**
56  *
57  * @author Christos KK Loverdos <loverdos@gmail.com>
58  */
59
60 class UserActor extends ReflectiveRoleableActor {
61   private[this] var _userID: String = _
62   private[this] var _userState: UserState = _
63
64   self.lifeCycle = Temporary
65
66   private[this] def _shutmedown(): Unit = {
67     if(_haveFullState) {
68       UserActorCache.invalidate(this._userID)
69     }
70
71     self ! PoisonPill
72   }
73
74   override protected def onThrowable(t: Throwable, message: AnyRef) = {
75     ERROR("Oops!\n", chainOfCauses(t).map("!! " + _) mkString "\n")
76     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
77
78     _shutmedown()
79   }
80
81   def role = UserActorRole
82
83   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
84 //  private[this] def _userId = _userState.userId
85
86   private[this] def _timestampTheshold =
87     _configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
88
89
90   private[this] def _haveFullState = {
91     (this._userID ne null) && (this._userState ne null)
92   }
93
94   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
95   }
96
97   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
98   }
99
100   private[this] def _computeAgreementForNewUser(imEvent: IMEventModel): String = {
101     // FIXME: Implement based on the role
102     "default"
103   }
104
105   private[this] def processCreateUser(imEvent: IMEventModel): Unit = {
106     val userID = imEvent.userID
107     this._userID = userID
108
109     val store = _configurator.storeProvider.userStateStore
110     // try find user state. normally should ot exist
111     val latestUserStateOpt = store.findLatestUserStateByUserID(userID)
112     if(latestUserStateOpt.isDefined) {
113       logger.error("Got %s(%s, %s) but user already exists. Ingoring".format(
114         userID,
115         shortClassNameOf(imEvent),
116         imEvent.eventType))
117
118       return
119     }
120
121     val initialAgreementName = _computeAgreementForNewUser(imEvent)
122     val newUserState    = DefaultUserStateComputations.createInitialUserState(
123       userID,
124       imEvent.occurredMillis,
125       imEvent.isActive,
126       0.0,
127       List(imEvent.role),
128       initialAgreementName)
129
130     this._userState = newUserState
131
132     // FIXME: If this fails, then the actor must be shut down.
133     store.insertUserState(newUserState)
134   }
135
136   private[this] def processModifyUser(imEvent: IMEventModel): Unit = {
137     val now = TimeHelpers.nowMillis()
138
139     if(!_haveFullState) {
140       ERROR("Got %s(%s) but have no state. Shutting down", shortClassNameOf(imEvent), imEvent.eventType)
141       _shutmedown()
142       return
143     }
144
145     this._userState = this._userState.modifyFromIMEvent(imEvent, now)
146   }
147
148   def onProcessSetUserID(event: ProcessSetUserID): Unit = {
149     this._userID = event.userID
150   }
151
152   def onProcessIMEvent(event: ProcessIMEvent): Unit = {
153     val imEvent = event.imEvent
154     if(imEvent.isCreateUser) {
155       processCreateUser(imEvent)
156     } else if(imEvent.isModifyUser) {
157       processModifyUser(imEvent)
158     } else {
159       throw new AquariumException("Cannot interpret %s".format(imEvent))
160     }
161   }
162
163   def onRequestUserBalance(event: RequestUserBalance): Unit = {
164     val userId = event.userID
165     // FIXME: Implement threshold
166     self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount)
167   }
168
169   def onUserRequestGetState(event: UserRequestGetState): Unit = {
170     val userId = event.userID
171    // FIXME: implement
172     self reply UserResponseGetState(userId, this._userState)
173   }
174
175   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
176   }
177
178
179   private[this] def D_userID = {
180     if(this._userID eq null)
181       "<NOT INITIALIZED>" // We always get a userID first
182     else
183       if(this._userState eq null)
184         "%s, NO STATE".format(this._userID)
185       else
186         "%s".format(this._userID)
187   }
188
189   private[this] def DEBUG(fmt: String, args: Any*) =
190     logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
191
192   private[this] def INFO(fmt: String, args: Any*) =
193     logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
194
195   private[this] def WARN(fmt: String, args: Any*) =
196     logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
197
198   private[this] def ERROR(fmt: String, args: Any*) =
199     logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
200
201   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
202       logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
203 }