77ebfb9564fea552982eebe24c25b91092bb2c24
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package user
39
40 import gr.grnet.aquarium.actor._
41
42 import gr.grnet.aquarium.util.shortClassNameOf
43 import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
44 import akka.config.Supervision.Temporary
45 import gr.grnet.aquarium.Configurator
46 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
47 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
48 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
49 import gr.grnet.aquarium.computation.data.IMStateSnapshot
50 import gr.grnet.aquarium.computation.UserState
51 import gr.grnet.aquarium.event.model.im.IMEventModel
52
53 /**
54  *
55  * @author Christos KK Loverdos <loverdos@gmail.com>
56  */
57
58 class UserActor extends ReflectiveRoleableActor {
59   private[this] var _imState: IMStateSnapshot = _
60   private[this] var _userState: UserState = _
61
62   self.lifeCycle = Temporary
63
64   private[this] def _userID = this._userState.userID
65   private[this] def _shutmedown(): Unit = {
66     if(_haveUserState) {
67       UserActorCache.invalidate(_userID)
68     }
69
70     self.stop()
71   }
72
73   override protected def onThrowable(t: Throwable, message: AnyRef) = {
74     logChainOfCauses(t)
75     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
76
77     _shutmedown()
78   }
79
80   def role = UserActorRole
81
82   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
83
84   private[this] def _timestampTheshold =
85     _configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
86
87
88   private[this] def _haveUserState = {
89     this._userState ne null
90   }
91
92   private[this] def _haveIMState = {
93     this._imState ne null
94   }
95
96   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
97   }
98
99   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
100   }
101
102   private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = {
103     // FIXME: Implement based on the role
104     "default"
105   }
106
107   def onProcessIMEvent(event: ProcessIMEvent): Unit = {
108     val now = TimeHelpers.nowMillis()
109
110     val imEvent = event.imEvent
111     val hadIMState = _haveIMState
112
113     if(hadIMState) {
114       val newOccurredMillis = imEvent.occurredMillis
115       val currentOccurredMillis = this._imState.imEvent.occurredMillis
116
117       if(newOccurredMillis < currentOccurredMillis) {
118         INFO(
119           "Ignoring older IMEvent: [%s] < [%s]",
120           new MutableDateCalc(newOccurredMillis).toYYYYMMDDHHMMSSSSS,
121           new MutableDateCalc(currentOccurredMillis).toYYYYMMDDHHMMSSSSS)
122
123         return
124       }
125     }
126
127     this._imState = IMStateSnapshot(imEvent)
128     DEBUG("%s %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState))
129   }
130
131   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
132     val userId = event.userID
133     // FIXME: Implement
134     self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount))
135   }
136
137   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
138     val userId = event.userID
139    // FIXME: Implement
140     self reply GetUserStateResponse(userId, Right(this._userState))
141   }
142
143   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
144     val rcEvent = event.rcEvent
145
146     logger.info("Got\n{}", rcEvent.toJsonString)
147   }
148
149
150   private[this] def D_userID = {
151     if(this._userState eq null)
152       if(this._imState eq null)
153         "<NOT INITIALIZED>"
154       else
155         this._imState.imEvent.userID
156     else
157       this._userState.userID
158   }
159
160   private[this] def DEBUG(fmt: String, args: Any*) =
161     logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
162
163   private[this] def INFO(fmt: String, args: Any*) =
164     logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
165
166   private[this] def WARN(fmt: String, args: Any*) =
167     logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
168
169   private[this] def ERROR(fmt: String, args: Any*) =
170     logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
171
172   private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
173     logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
174 }