root / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala @ 060b04bd
History | View | Annotate | Download (14.7 kB)
1 |
/* |
---|---|
2 |
* Copyright 2011-2012 GRNET S.A. All rights reserved. |
3 |
* |
4 |
* Redistribution and use in source and binary forms, with or |
5 |
* without modification, are permitted provided that the following |
6 |
* conditions are met: |
7 |
* |
8 |
* 1. Redistributions of source code must retain the above |
9 |
* copyright notice, this list of conditions and the following |
10 |
* disclaimer. |
11 |
* |
12 |
* 2. Redistributions in binary form must reproduce the above |
13 |
* copyright notice, this list of conditions and the following |
14 |
* disclaimer in the documentation and/or other materials |
15 |
* provided with the distribution. |
16 |
* |
17 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
18 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
19 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
20 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
21 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
22 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
23 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
24 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
25 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
26 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
27 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 |
* POSSIBILITY OF SUCH DAMAGE. |
29 |
* |
30 |
* The views and conclusions contained in the software and |
31 |
* documentation are those of the authors and should not be |
32 |
* interpreted as representing official policies, either expressed |
33 |
* or implied, of GRNET S.A. |
34 |
*/ |
35 |
|
36 |
package gr.grnet.aquarium.actor |
37 |
package service |
38 |
package user |
39 |
|
40 |
import gr.grnet.aquarium.{Real, AquariumInternalError} |
41 |
import gr.grnet.aquarium.actor.message.GetUserBalanceRequest |
42 |
import gr.grnet.aquarium.actor.message.GetUserBalanceResponse |
43 |
import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData |
44 |
import gr.grnet.aquarium.actor.message.GetUserBillRequest |
45 |
import gr.grnet.aquarium.actor.message.GetUserBillResponse |
46 |
import gr.grnet.aquarium.actor.message.GetUserBillResponseData |
47 |
import gr.grnet.aquarium.actor.message.GetUserStateRequest |
48 |
import gr.grnet.aquarium.actor.message.GetUserStateResponse |
49 |
import gr.grnet.aquarium.actor.message.GetUserWalletRequest |
50 |
import gr.grnet.aquarium.actor.message.GetUserWalletResponse |
51 |
import gr.grnet.aquarium.actor.message.GetUserWalletResponseData |
52 |
import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded |
53 |
import gr.grnet.aquarium.charging.state.UserStateModel |
54 |
import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg} |
55 |
import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers} |
56 |
import gr.grnet.aquarium.service.event.BalanceEvent |
57 |
import gr.grnet.aquarium.util.date.TimeHelpers |
58 |
import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf} |
59 |
import gr.grnet.aquarium.policy.{ResourceType, PolicyModel} |
60 |
import gr.grnet.aquarium.charging.bill.BillEntryMsg |
61 |
|
62 |
/** |
63 |
* |
64 |
* @author Christos KK Loverdos <loverdos@gmail.com> |
65 |
*/ |
66 |
|
67 |
class UserActor extends ReflectiveRoleableActor { |
68 |
private[this] var _imMsgCount = 0 |
69 |
private[this] var _userStateModel: UserStateModel = _ |
70 |
|
71 |
def userID = { |
72 |
if(!haveUserState) { |
73 |
throw new AquariumInternalError("%s not initialized") |
74 |
} |
75 |
|
76 |
this._userStateModel.userID |
77 |
} |
78 |
|
79 |
override def postStop() { |
80 |
DEBUG("I am finally stopped (in postStop())") |
81 |
aquarium.akkaService.notifyUserActorPostStop(this) |
82 |
} |
83 |
|
84 |
private[this] def shutmedown(): Unit = { |
85 |
if(haveUserState) { |
86 |
aquarium.akkaService.invalidateUserActor(this) |
87 |
} |
88 |
} |
89 |
|
90 |
override protected def onThrowable(t: Throwable, message: AnyRef) = { |
91 |
LogHelpers.logChainOfCauses(logger, t) |
92 |
ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage) |
93 |
|
94 |
shutmedown() |
95 |
} |
96 |
|
97 |
def role = UserActorRole |
98 |
|
99 |
private[this] def chargingService = aquarium.chargingService |
100 |
|
101 |
def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { |
102 |
} |
103 |
|
104 |
private[this] def unsafeUserCreationIMEventMsg = { |
105 |
this._userStateModel.unsafeUserCreationIMEvent |
106 |
} |
107 |
|
108 |
private[this] def haveAgreements = { |
109 |
(this._userStateModel ne null) |
110 |
} |
111 |
|
112 |
private[this] def haveUserCreationEvent = { |
113 |
haveAgreements && |
114 |
this._userStateModel.hasUserCreationEvent |
115 |
} |
116 |
|
117 |
private[this] def haveUserState = { |
118 |
(this._userStateModel ne null) |
119 |
} |
120 |
|
121 |
private[this] def isInitial = this._userStateModel.isInitial |
122 |
|
123 |
/** |
124 |
* Creates the agreement history from all the stored IMEvents. |
125 |
* |
126 |
* @return (`true` iff there was a user CREATE event, the number of events processed) |
127 |
*/ |
128 |
private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = { |
129 |
DEBUG("createUserAgreementHistoryFromStoredIMEvents()") |
130 |
assert(haveUserState, "haveUserState") |
131 |
|
132 |
|
133 |
var _imcounter = 0 |
134 |
|
135 |
val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒ |
136 |
_imcounter += 1 |
137 |
DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent) |
138 |
|
139 |
if(_imcounter == 1 && !MessageHelpers.isUserCreationIMEvent(imEvent)) { |
140 |
// The very first event must be a CREATE event. Otherwise we abort initialization. |
141 |
// This will normally happen during devops :) |
142 |
INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent)) |
143 |
false |
144 |
} |
145 |
else { |
146 |
val effectiveFromMillis = imEvent.getOccurredMillis |
147 |
val role = imEvent.getRole |
148 |
// calling unsafe just for the side-effect |
149 |
assert( |
150 |
aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis) ne null, |
151 |
"aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis) |
152 |
) |
153 |
|
154 |
this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent) |
155 |
true |
156 |
} |
157 |
} |
158 |
|
159 |
this._imMsgCount = _imcounter |
160 |
|
161 |
DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg) |
162 |
(hadCreateEvent, _imcounter) |
163 |
} |
164 |
|
165 |
private[this] def saveFirstUserState(userID: String) { |
166 |
this._userStateModel.userStateMsg.setIsFirst(true) |
167 |
this._userStateModel.updateUserStateMsg( |
168 |
aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg) |
169 |
) |
170 |
} |
171 |
|
172 |
private[this] def saveSubsequentUserState() { |
173 |
this._userStateModel.userStateMsg.setIsFirst(false) |
174 |
this._userStateModel.updateUserStateMsg( |
175 |
aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg) |
176 |
) |
177 |
} |
178 |
|
179 |
private[this] def loadLastKnownUserStateAndUpdateAgreements() { |
180 |
val userID = this._userStateModel.userID |
181 |
aquarium.userStateStore.findLatestUserState(userID) match { |
182 |
case None ⇒ |
183 |
// First user state ever |
184 |
saveFirstUserState(userID) |
185 |
|
186 |
case Some(latestUserState) ⇒ |
187 |
this._userStateModel.updateUserStateMsg(latestUserState) |
188 |
} |
189 |
} |
190 |
|
191 |
private[this] def processResourceEventsAfterLastKnownUserState() { |
192 |
// Update the user state snapshot with fresh (ie not previously processed) events. |
193 |
aquarium.resourceEventStore.foreachResourceEventOccurredInPeriod( |
194 |
this._userStateModel.userID, |
195 |
this._userStateModel.latestResourceEventOccurredMillis, |
196 |
TimeHelpers.nowMillis() |
197 |
) |
198 |
} |
199 |
|
200 |
private[this] def makeUserStateMsgUpToDate() { |
201 |
loadLastKnownUserStateAndUpdateAgreements() |
202 |
processResourceEventsAfterLastKnownUserState() |
203 |
} |
204 |
|
205 |
private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = { |
206 |
if(!isInitial) { |
207 |
return false |
208 |
} |
209 |
|
210 |
val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID) |
211 |
|
212 |
if(userCreated) { |
213 |
makeUserStateMsgUpToDate() |
214 |
} |
215 |
|
216 |
nextThing() |
217 |
|
218 |
true |
219 |
} |
220 |
|
221 |
/** |
222 |
* Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the |
223 |
* messaging hub (rabbitmq). |
224 |
*/ |
225 |
def onIMEventMsg(imEvent: IMEventMsg) { |
226 |
if(checkInitial()) { |
227 |
return |
228 |
} |
229 |
|
230 |
// Check for out of sync (regarding IMEvents) |
231 |
val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis |
232 |
if(isOutOfSyncIM) { |
233 |
// clear all resource state |
234 |
// FIXME implement |
235 |
|
236 |
return |
237 |
} |
238 |
|
239 |
// Check out of sync (regarding ResourceEvents) |
240 |
val isOutOfSyncRC = false // FIXME implement |
241 |
if(isOutOfSyncRC) { |
242 |
// TODO |
243 |
|
244 |
return |
245 |
} |
246 |
|
247 |
// Make new agreement |
248 |
this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent) |
249 |
this._imMsgCount += 1 |
250 |
|
251 |
if(MessageHelpers.isUserCreationIMEvent(imEvent)) { |
252 |
makeUserStateMsgUpToDate() |
253 |
} |
254 |
|
255 |
DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg) |
256 |
} |
257 |
|
258 |
def onResourceEventMsg(rcEvent: ResourceEventMsg) { |
259 |
if(checkInitial()) { |
260 |
return |
261 |
} |
262 |
|
263 |
if(!haveUserCreationEvent) { |
264 |
DEBUG("No agreements. Ignoring %s", rcEvent) |
265 |
|
266 |
return |
267 |
} |
268 |
|
269 |
assert(haveUserState, "haveUserState") |
270 |
|
271 |
val oldTotalCredits = this._userStateModel.totalCreditsAsReal |
272 |
|
273 |
chargingService.processResourceEvent( |
274 |
rcEvent.getReceivedMillis, |
275 |
rcEvent, |
276 |
this._userStateModel, |
277 |
aquarium.currentResourceMapping, |
278 |
true |
279 |
) |
280 |
|
281 |
val newTotalCredits = this._userStateModel.totalCreditsAsReal |
282 |
|
283 |
if(oldTotalCredits.signum * newTotalCredits.signum < 0) { |
284 |
aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0) |
285 |
} |
286 |
|
287 |
DEBUG("Updated %s", this._userStateModel) |
288 |
} |
289 |
|
290 |
def onGetUserBillRequest(event: GetUserBillRequest): Unit = { |
291 |
checkInitial() |
292 |
|
293 |
try{ |
294 |
val timeslot = event.timeslot |
295 |
val resourceTypes: Map[String, ResourceType] = aquarium.policyStore. |
296 |
loadSortedPolicyModelsWithin(timeslot.from.getTime, |
297 |
timeslot.to.getTime). |
298 |
values.headOption match { |
299 |
case None => Map[String,ResourceType]() |
300 |
case Some(policy:PolicyModel) => policy.resourceTypesMap |
301 |
} |
302 |
val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None |
303 |
val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes) |
304 |
//val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry) |
305 |
//logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString) |
306 |
val billData = GetUserBillResponseData(this.userID,billEntryMsg) |
307 |
sender ! GetUserBillResponse(Right(billData)) |
308 |
} catch { |
309 |
case e:Exception => |
310 |
e.printStackTrace() |
311 |
sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500) |
312 |
} |
313 |
} |
314 |
|
315 |
def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = { |
316 |
checkInitial() |
317 |
|
318 |
val userID = event.userID |
319 |
|
320 |
(haveAgreements, haveUserState) match { |
321 |
case (true, true) ⇒ |
322 |
// (User CREATEd, with balance state) |
323 |
val realtimeMillis = TimeHelpers.nowMillis() |
324 |
chargingService.calculateRealtimeUserState( |
325 |
this._userStateModel, |
326 |
aquarium.currentResourceMapping, |
327 |
realtimeMillis |
328 |
) |
329 |
|
330 |
sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits))) |
331 |
|
332 |
case (true, false) ⇒ |
333 |
// (User CREATEd, no balance state) |
334 |
// Return the default initial balance |
335 |
sender ! GetUserBalanceResponse( |
336 |
Right( |
337 |
GetUserBalanceResponseData( |
338 |
this.userID, |
339 |
aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString() |
340 |
))) |
341 |
|
342 |
case (false, true) ⇒ |
343 |
// (Not CREATEd, with balance state) |
344 |
// Clearly this is internal error |
345 |
sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500) |
346 |
|
347 |
case (false, false) ⇒ |
348 |
// (Not CREATEd, no balance state) |
349 |
// The user is completely unknown |
350 |
sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/) |
351 |
} |
352 |
} |
353 |
|
354 |
def onGetUserStateRequest(event: GetUserStateRequest): Unit = { |
355 |
checkInitial() |
356 |
|
357 |
haveUserState match { |
358 |
case true ⇒ |
359 |
val realtimeMillis = TimeHelpers.nowMillis() |
360 |
chargingService.calculateRealtimeUserState( |
361 |
this._userStateModel, |
362 |
aquarium.currentResourceMapping, |
363 |
realtimeMillis |
364 |
) |
365 |
|
366 |
sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg)) |
367 |
|
368 |
case false ⇒ |
369 |
sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404) |
370 |
} |
371 |
} |
372 |
|
373 |
def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = { |
374 |
checkInitial() |
375 |
|
376 |
haveUserState match { |
377 |
case true ⇒ |
378 |
DEBUG("haveWorkingUserState: %s", event) |
379 |
val realtimeMillis = TimeHelpers.nowMillis() |
380 |
chargingService.calculateRealtimeUserState( |
381 |
this._userStateModel, |
382 |
aquarium.currentResourceMapping, |
383 |
realtimeMillis |
384 |
) |
385 |
|
386 |
sender ! GetUserWalletResponse( |
387 |
Right( |
388 |
GetUserWalletResponseData( |
389 |
this.userID, |
390 |
this._userStateModel.totalCredits, |
391 |
MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries) |
392 |
))) |
393 |
|
394 |
case false ⇒ |
395 |
DEBUG("!haveWorkingUserState: %s", event) |
396 |
haveAgreements match { |
397 |
case true ⇒ |
398 |
DEBUG("haveAgreements: %s", event) |
399 |
sender ! GetUserWalletResponse( |
400 |
Right( |
401 |
GetUserWalletResponseData( |
402 |
this.userID, |
403 |
Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)), |
404 |
MessageFactory.newWalletEntriesMsg() |
405 |
))) |
406 |
|
407 |
case false ⇒ |
408 |
DEBUG("!haveUserCreationIMEvent: %s", event) |
409 |
sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404) |
410 |
} |
411 |
} |
412 |
} |
413 |
|
414 |
/** |
415 |
* Initializes the actor's internal state. |
416 |
* |
417 |
* @param userID |
418 |
*/ |
419 |
def onSetUserActorUserID(userID: String) { |
420 |
// Create the full agreement history from the original sources (IMEvents) |
421 |
this._userStateModel = ModelFactory.newInitialUserStateModel( |
422 |
userID, |
423 |
Real(0), |
424 |
TimeHelpers.nowMillis() |
425 |
) |
426 |
|
427 |
require(this._userStateModel.isInitial, "this._userStateModel.isInitial") |
428 |
} |
429 |
|
430 |
private[this] def D_userID = { |
431 |
if(haveUserState) userID else "???" |
432 |
} |
433 |
|
434 |
private[this] def DEBUG(fmt: String, args: Any*) = |
435 |
logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*))) |
436 |
|
437 |
private[this] def INFO(fmt: String, args: Any*) = |
438 |
logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*))) |
439 |
|
440 |
private[this] def WARN(fmt: String, args: Any*) = |
441 |
logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*))) |
442 |
|
443 |
private[this] def ERROR(fmt: String, args: Any*) = |
444 |
logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*))) |
445 |
|
446 |
private[this] def ERROR(t: Throwable, fmt: String, args: Any*) = |
447 |
logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t) |
448 |
} |