2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.actor._
42 import akka.config.Supervision.Temporary
43 import gr.grnet.aquarium.Aquarium
44 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
45 import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest}
46 import gr.grnet.aquarium.computation.data.IMStateSnapshot
47 import gr.grnet.aquarium.event.model.im.IMEventModel
48 import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
52 * @author Christos KK Loverdos <loverdos@gmail.com>
55 class UserActor extends ReflectiveRoleableActor {
56 private[this] var _imState: IMStateSnapshot = _
57 // private[this] var _userState: UserState = _
58 // private[this] var _newUserState: NewUserState = _
60 self.lifeCycle = Temporary
62 // private[this] def _userID = this._newUserState.userID
63 private[this] def _shutmedown(): Unit = {
64 // if(_haveUserState) {
65 // UserActorCache.invalidate(_userID)
71 override protected def onThrowable(t: Throwable, message: AnyRef) = {
73 // ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
78 def role = UserActorRole
80 private[this] def aquarium: Aquarium = Aquarium.Instance
82 private[this] def _timestampTheshold =
83 aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000)
86 // private[this] def _haveUserState = {
87 // this._newUserState ne null
90 private[this] def _haveIMState = {
94 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
97 def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
100 private[this] def createIMState(userID: String): Unit = {
101 val store = aquarium.imEventStore
102 // TODO: Optimization: Since IMState only records roles, we should incrementally
103 // TODO: built it only for those IMEvents that changed the role.
104 store.replayIMEventsInOccurrenceOrder(userID) { imEvent ⇒
105 logger.debug("Replaying %s".format(imEvent))
107 val newState = this._imState match {
109 IMStateSnapshot.initial(imEvent)
112 currentState.copyWithEvent(imEvent)
115 this._imState = newState
118 logger.debug("Recomputed %s".format(this._imState))
121 def onInitializeUserState(event: InitializeUserState): Unit = {
122 logger.debug("Got %s".format(event))
123 createIMState(event.userID)
126 private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = {
127 // FIXME: Implement based on the role
132 * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
133 * When this method is called, we assume that all proper checks have been made and it
134 * is OK to proceed with the event processing.
136 def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
137 val imEvent = processEvent.imEvent
138 val hadIMState = _haveIMState
141 if(this._imState.latestIMEvent.id == imEvent.id) {
142 // This happens when the actor is brought to life, then immediately initialized, and then
143 // sent the first IM event. But from the initialization procedure, this IM event will have
144 // already been loaded from DB!
145 logger.debug("Ignoring first %s after birth".format(imEvent.toDebugString))
149 this._imState = this._imState.copyWithEvent(imEvent)
151 this._imState = IMStateSnapshot.initial(imEvent)
154 // DEBUG("%s %s = %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState), this._imState)
157 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
158 val userId = event.userID
160 // self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount))
163 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
164 val userId = event.userID
166 // self reply GetUserStateResponse(userId, Right(this._userState))
169 def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
170 val rcEvent = event.rcEvent
172 logger.info("Got\n{}", rcEvent.toJsonString)
176 // private[this] def D_userID = {
177 // if(this._newUserState eq null)
178 // if(this._imState eq null)
179 // "<NOT INITIALIZED>"
181 // this._imState.latestIMEvent.userID
183 // this._newUserState.userID
186 // private[this] def DEBUG(fmt: String, args: Any*) =
187 // logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
189 // private[this] def INFO(fmt: String, args: Any*) =
190 // logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
192 // private[this] def WARN(fmt: String, args: Any*) =
193 // logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
195 // private[this] def ERROR(fmt: String, args: Any*) =
196 // logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
198 // private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
199 // logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)