Add TODO for future optimization
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.actor._
41
42 import akka.config.Supervision.Temporary
43 import gr.grnet.aquarium.Aquarium
44 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
45 import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest}
46 import gr.grnet.aquarium.computation.data.IMStateSnapshot
47 import gr.grnet.aquarium.event.model.im.IMEventModel
48 import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
49
50 /**
51  *
52  * @author Christos KK Loverdos <loverdos@gmail.com>
53  */
54
55 class UserActor extends ReflectiveRoleableActor {
56   private[this] var _imState: IMStateSnapshot = _
57 //  private[this] var _userState: UserState = _
58 //  private[this] var _newUserState: NewUserState = _
59
60   self.lifeCycle = Temporary
61
62 //  private[this] def _userID = this._newUserState.userID
63   private[this] def _shutmedown(): Unit = {
64 //    if(_haveUserState) {
65 //      UserActorCache.invalidate(_userID)
66 //    }
67
68     self.stop()
69   }
70
71   override protected def onThrowable(t: Throwable, message: AnyRef) = {
72     logChainOfCauses(t)
73 //    ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
74
75     _shutmedown()
76   }
77
78   def role = UserActorRole
79
80   private[this] def aquarium: Aquarium = Aquarium.Instance
81
82   private[this] def _timestampTheshold =
83     aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000)
84
85
86 //  private[this] def _haveUserState = {
87 //    this._newUserState ne null
88 //  }
89
90   private[this] def _haveIMState = {
91     this._imState ne null
92   }
93
94   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
95   }
96
97   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
98   }
99
100   private[this] def createIMState(userID: String): Unit = {
101     val store = aquarium.imEventStore
102     // TODO: Optimization: Since IMState only records roles, we should incrementally
103     // TODO:               built it only for those IMEvents that changed the role.
104     store.replayIMEventsInOccurrenceOrder(userID) { imEvent ⇒
105       logger.debug("Replaying %s".format(imEvent))
106
107       val newState = this._imState match {
108         case null ⇒
109           IMStateSnapshot.initial(imEvent)
110
111         case currentState ⇒
112           currentState.copyWithEvent(imEvent)
113       }
114
115       this._imState = newState
116     }
117
118     logger.debug("Recomputed %s".format(this._imState))
119   }
120
121   def onInitializeUserState(event: InitializeUserState): Unit = {
122     logger.debug("Got %s".format(event))
123     createIMState(event.userID)
124   }
125
126   private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = {
127     // FIXME: Implement based on the role
128     "default"
129   }
130
131   /**
132    * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
133    * When this method is called, we assume that all proper checks have been made and it
134    * is OK to proceed with the event processing.
135    */
136   def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
137     val imEvent = processEvent.imEvent
138     val hadIMState = _haveIMState
139
140     if(hadIMState) {
141       if(this._imState.latestIMEvent.id == imEvent.id) {
142         // This happens when the actor is brought to life, then immediately initialized, and then
143         // sent the first IM event. But from the initialization procedure, this IM event will have
144         // already been loaded from DB!
145         logger.debug("Ignoring first %s after birth".format(imEvent.toDebugString))
146         return
147       }
148
149       this._imState = this._imState.copyWithEvent(imEvent)
150     } else {
151       this._imState = IMStateSnapshot.initial(imEvent)
152     }
153
154 //    DEBUG("%s %s = %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState), this._imState)
155   }
156
157   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
158     val userId = event.userID
159     // FIXME: Implement
160 //    self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount))
161   }
162
163   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
164     val userId = event.userID
165    // FIXME: Implement
166 //    self reply GetUserStateResponse(userId, Right(this._userState))
167   }
168
169   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
170     val rcEvent = event.rcEvent
171
172     logger.info("Got\n{}", rcEvent.toJsonString)
173   }
174
175
176 //  private[this] def D_userID = {
177 //    if(this._newUserState eq null)
178 //      if(this._imState eq null)
179 //        "<NOT INITIALIZED>"
180 //      else
181 //        this._imState.latestIMEvent.userID
182 //    else
183 //      this._newUserState.userID
184 //  }
185 //
186 //  private[this] def DEBUG(fmt: String, args: Any*) =
187 //    logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
188 //
189 //  private[this] def INFO(fmt: String, args: Any*) =
190 //    logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
191 //
192 //  private[this] def WARN(fmt: String, args: Any*) =
193 //    logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
194 //
195 //  private[this] def ERROR(fmt: String, args: Any*) =
196 //    logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
197 //
198 //  private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
199 //    logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
200 }