root / src / main / scala / gr / grnet / aquarium / user / actor / UserActor.scala @ 16319b44
History | View | Annotate | Download (25.1 kB)
1 |
/* |
---|---|
2 |
* Copyright 2011 GRNET S.A. All rights reserved. |
3 |
* |
4 |
* Redistribution and use in source and binary forms, with or |
5 |
* without modification, are permitted provided that the following |
6 |
* conditions are met: |
7 |
* |
8 |
* 1. Redistributions of source code must retain the above |
9 |
* copyright notice, this list of conditions and the following |
10 |
* disclaimer. |
11 |
* |
12 |
* 2. Redistributions in binary form must reproduce the above |
13 |
* copyright notice, this list of conditions and the following |
14 |
* disclaimer in the documentation and/or other materials |
15 |
* provided with the distribution. |
16 |
* |
17 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
18 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
19 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
20 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
21 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
22 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
23 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
24 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
25 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
26 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
27 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 |
* POSSIBILITY OF SUCH DAMAGE. |
29 |
* |
30 |
* The views and conclusions contained in the software and |
31 |
* documentation are those of the authors and should not be |
32 |
* interpreted as representing official policies, either expressed |
33 |
* or implied, of GRNET S.A. |
34 |
*/ |
35 |
|
36 |
package gr.grnet.aquarium.user.actor |
37 |
|
38 |
import gr.grnet.aquarium.actor._ |
39 |
import gr.grnet.aquarium.Configurator |
40 |
import gr.grnet.aquarium.processor.actor._ |
41 |
import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe} |
42 |
import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting} |
43 |
import gr.grnet.aquarium.user._ |
44 |
import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent} |
45 |
import gr.grnet.aquarium.logic.accounting.dsl.{DSLPolicy, DSLResource, DSLSimpleResource, DSLComplexResource} |
46 |
import java.util.Date |
47 |
import gr.grnet.aquarium.util.{DateUtils, TimeHelpers, Loggable} |
48 |
|
49 |
|
50 |
/** |
51 |
* |
52 |
* @author Christos KK Loverdos <loverdos@gmail.com> |
53 |
*/ |
54 |
|
55 |
class UserActor extends AquariumActor |
56 |
with Loggable with Accounting with DateUtils { |
57 |
@volatile |
58 |
private[this] var _userId: String = _ |
59 |
@volatile |
60 |
private[this] var _userState: UserState = _ |
61 |
@volatile |
62 |
private[this] var _timestampTheshold: Long = _ |
63 |
|
64 |
def role = UserActorRole |
65 |
|
66 |
private[this] def _configurator: Configurator = Configurator.MasterConfigurator |
67 |
|
68 |
private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = { |
69 |
if(checkForOlderEvents) { |
70 |
DEBUG("Checking for events older than %s" format resourceEvent) |
71 |
processOlderResourceEvents(resourceEvent) |
72 |
} |
73 |
|
74 |
justProcessTheResourceEvent(resourceEvent, "ACTUAL") |
75 |
} |
76 |
|
77 |
|
78 |
/** |
79 |
* Given an "onoff" event, we try to locate all unprocessed resource events that precede this one. |
80 |
*/ |
81 |
def findOlderResourceEventsForOnOff(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = { |
82 |
Nil |
83 |
} |
84 |
|
85 |
def findOlderResourceEventsForOther(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = { |
86 |
Nil |
87 |
} |
88 |
|
89 |
/** |
90 |
* Find resource events that precede the given one and are unprocessed. |
91 |
*/ |
92 |
private[this] def findOlderResourceEvents(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = { |
93 |
if(rcEvent.isOnOffEvent(policy)) { |
94 |
findOlderResourceEventsForOnOff(rcEvent, policy) |
95 |
} else { |
96 |
findOlderResourceEventsForOther(rcEvent, policy) |
97 |
} |
98 |
} |
99 |
|
100 |
/** |
101 |
* Find and process older resource events. |
102 |
* |
103 |
* Older resource events are found based on the latest credit calculation, that is the latest |
104 |
* wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries |
105 |
* have been calculated for these resource events and start from there. |
106 |
*/ |
107 |
private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = { |
108 |
assert(_userId == resourceEvent.userId) |
109 |
val rceId = resourceEvent.id |
110 |
val userId = resourceEvent.userId |
111 |
val resourceEventStore = _configurator.resourceEventStore |
112 |
val walletEntriesStore = _configurator.walletStore |
113 |
|
114 |
// 1. Find latest wallet entry |
115 |
val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId) |
116 |
latestWalletEntriesM match { |
117 |
case Just(latestWalletEntries) ⇒ |
118 |
// The time on which we base the selection of the older events |
119 |
val selectionTime = latestWalletEntries.head.occurredMillis |
120 |
|
121 |
// 2. Now find all resource events past the time of the latest wallet entry. |
122 |
// These events have not been processed, except probably those ones |
123 |
// that have the same `occurredMillis` with `selectionTime` |
124 |
val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime) |
125 |
|
126 |
// 3. Filter out those resource events for which no wallet entry has actually been |
127 |
// produced. |
128 |
val rcEventsToProcess = for { |
129 |
oldRCEvent <- oldRCEvents |
130 |
oldRCEventId = oldRCEvent.id |
131 |
latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId) |
132 |
} yield { |
133 |
oldRCEvent |
134 |
} |
135 |
|
136 |
DEBUG("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent)) |
137 |
|
138 |
for { |
139 |
rcEventToProcess <- rcEventsToProcess |
140 |
} { |
141 |
justProcessTheResourceEvent(rcEventToProcess, "OLDER") |
142 |
} |
143 |
case NoVal ⇒ |
144 |
DEBUG("No events to process older than %s".format(resourceEvent)) |
145 |
case Failed(e, m) ⇒ |
146 |
ERROR("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage)) |
147 |
} |
148 |
} |
149 |
|
150 |
private[this] def _storeWalletEntries(walletEntries: List[WalletEntry], allowNonFinalized: Boolean): Unit = { |
151 |
val walletEntriesStore = _configurator.walletStore |
152 |
for(walletEntry <- walletEntries) { |
153 |
val allow = walletEntry.finalized || allowNonFinalized |
154 |
if(allow) { |
155 |
walletEntriesStore.storeWalletEntry(walletEntry) |
156 |
} |
157 |
} |
158 |
} |
159 |
|
160 |
private[this] def _calcNewCreditSum(walletEntries: List[WalletEntry]): Double = { |
161 |
val newCredits = for { |
162 |
walletEntry <- walletEntries if(walletEntry.finalized) |
163 |
} yield walletEntry.value.toDouble |
164 |
|
165 |
newCredits.sum |
166 |
} |
167 |
|
168 |
/** |
169 |
* Process the resource event as if nothing else matters. Just do it. |
170 |
*/ |
171 |
private[this] def justProcessTheResourceEvent(ev: ResourceEvent, logLabel: String): Unit = { |
172 |
val start = System.currentTimeMillis |
173 |
DEBUG("Processing [%s] %s".format(logLabel, ev)) |
174 |
|
175 |
// Initially, the user state (regarding resources) is empty. |
176 |
// So we have to compensate for both a totally empty resource state |
177 |
// and the update with new values. |
178 |
|
179 |
// 1. Find the resource definition |
180 |
val policy = Policy.policy |
181 |
policy.findResource(ev.resource) match { |
182 |
case Some(resource) ⇒ |
183 |
// 2. Get the instance id and value for the resource |
184 |
val instanceIdM = resource match { |
185 |
// 2.1 If it is complex, from the details map, get the field which holds the instanceId |
186 |
case DSLComplexResource(name, unit, costPolicy, descriminatorField) ⇒ |
187 |
ev.details.get(descriminatorField) match { |
188 |
case Some(instanceId) ⇒ |
189 |
Just(instanceId) |
190 |
case None ⇒ |
191 |
// We should have some value under this key here.... |
192 |
Failed(throw new AccountingException("")) //TODO: See what to do here |
193 |
} |
194 |
// 2.2 If it is simple, ... |
195 |
case DSLSimpleResource(name, unit, costPolicy) ⇒ |
196 |
// ... by convention, the instanceId of a simple resource is just "1" |
197 |
// @see [[gr.grnet.aquarium.user.OwnedResourcesSnapshot]] |
198 |
Just("1") |
199 |
} |
200 |
|
201 |
// 3. Did we get a valid instanceId? |
202 |
instanceIdM match { |
203 |
// 3.1 Yes, time to get/update the current state |
204 |
case Just(instanceId) ⇒ |
205 |
val oldOwnedResources = _userState.ownedResources |
206 |
|
207 |
// A. First state diff: the modified resource value |
208 |
val StateChangeMillis = TimeHelpers.nowMillis |
209 |
val (newOwnedResources, oldRCInstanceOpt, newRCInstance) = oldOwnedResources. |
210 |
addOrUpdateResourceSnapshot(resource.name, instanceId, ev.value, ev.occurredMillis) |
211 |
val previousRCUpdateTime = oldRCInstanceOpt.map(_.snapshotTime).getOrElse(newRCInstance.snapshotTime) |
212 |
|
213 |
// Calculate the wallet entries generated from this resource event |
214 |
_userState.maybeDSLAgreement match { |
215 |
case Just(agreement) ⇒ |
216 |
val walletEntriesM = chargeEvent(ev, agreement, ev.value, |
217 |
new Date(previousRCUpdateTime), |
218 |
findRelatedEntries(resource, ev.getInstanceId(policy))) |
219 |
walletEntriesM match { |
220 |
case Just(walletEntries) ⇒ |
221 |
_storeWalletEntries(walletEntries, true) |
222 |
|
223 |
// B. Second state diff: the new credits |
224 |
val newCreditsSum = _calcNewCreditSum(walletEntries) |
225 |
val oldCredits = _userState.safeCredits.data |
226 |
val newCredits = CreditSnapshot(oldCredits + newCreditsSum, StateChangeMillis) |
227 |
|
228 |
// Finally, the userState change |
229 |
DEBUG("Credits = %s".format(this._userId, newCredits)) |
230 |
DEBUG("Resources = %s".format(this._userId, newOwnedResources)) |
231 |
this._userState = this._userState.copy( |
232 |
credits = newCredits, |
233 |
ownedResources = newOwnedResources |
234 |
) |
235 |
case NoVal ⇒ |
236 |
DEBUG("No wallet entries generated for %s".format(ev)) |
237 |
case failed @ Failed(e, m) ⇒ |
238 |
failed |
239 |
} |
240 |
|
241 |
case NoVal ⇒ |
242 |
Failed(new UserDataSnapshotException("No agreement snapshot found for user %s".format(this._userId))) |
243 |
case failed @ Failed(e, m) ⇒ |
244 |
failed |
245 |
} |
246 |
// 3.2 No, no luck, this is an error |
247 |
case NoVal ⇒ |
248 |
Failed(new UserDataSnapshotException("No instanceId for resource %s of user %s".format(resource, this._userId))) |
249 |
case failed @ Failed(e, m) ⇒ |
250 |
failed |
251 |
} |
252 |
// No resource definition found, this is an error |
253 |
case None ⇒ // Policy.policy.findResource(ev.resource) |
254 |
Failed(new UserDataSnapshotException("No resource %s found for user %s".format(ev.resource, this._userId))) |
255 |
} |
256 |
|
257 |
DEBUG("Finished %s time: %d ms".format(ev.id, System.currentTimeMillis - start)) |
258 |
} |
259 |
|
260 |
private[this] def processCreateUser(event: UserEvent): Unit = { |
261 |
val userId = event.userId |
262 |
DEBUG("Creating user from state %s", event) |
263 |
val usersDB = _configurator.storeProvider.userStateStore |
264 |
usersDB.findUserStateByUserId(userId) match { |
265 |
case Just(userState) ⇒ |
266 |
WARN("User already created, state = %s".format(userState)) |
267 |
case failed @ Failed(e, m) ⇒ |
268 |
ERROR("[%s][%s] %s", e.getClass.getName, e.getMessage, m) |
269 |
case NoVal ⇒ |
270 |
// OK. Create a default UserState and store it |
271 |
val now = TimeHelpers.nowMillis |
272 |
val agreementOpt = Policy.policy.findAgreement("default") |
273 |
|
274 |
if(agreementOpt.isEmpty) { |
275 |
ERROR("No default agreement found. Cannot initialize user state") |
276 |
} else { |
277 |
this._userState = UserState( |
278 |
userId, |
279 |
0, |
280 |
ActiveSuspendedSnapshot(event.isStateActive, now), |
281 |
CreditSnapshot(0, now), |
282 |
AgreementSnapshot(agreementOpt.get.name, now), |
283 |
RolesSnapshot(event.roles, now), |
284 |
PaymentOrdersSnapshot(Nil, now), |
285 |
OwnedGroupsSnapshot(Nil, now), |
286 |
GroupMembershipsSnapshot(Nil, now), |
287 |
OwnedResourcesSnapshot(List(), now) |
288 |
) |
289 |
|
290 |
saveUserState |
291 |
DEBUG("Created and stored %s", this._userState) |
292 |
} |
293 |
} |
294 |
} |
295 |
|
296 |
private[this] def findRelatedEntries(res: DSLResource, instid: String): List[WalletEntry] = { |
297 |
val walletDB = _configurator.storeProvider.walletEntryStore |
298 |
walletDB.findPreviousEntry(_userId, res.name, instid, Some(false)) |
299 |
} |
300 |
|
301 |
|
302 |
private[this] def processModifyUser(event: UserEvent): Unit = { |
303 |
val now = TimeHelpers.nowMillis |
304 |
val newActive = ActiveSuspendedSnapshot(event.isStateActive, now) |
305 |
|
306 |
DEBUG("New active status = %s".format(newActive)) |
307 |
|
308 |
this._userState = this._userState.copy( active = newActive ) |
309 |
} |
310 |
/** |
311 |
* Use the provided [[gr.grnet.aquarium.logic.events.UserEvent]] to change any user state. |
312 |
*/ |
313 |
private[this] def processUserEvent(event: UserEvent): Unit = { |
314 |
if(event.isCreateUser) { |
315 |
processCreateUser(event) |
316 |
} else if(event.isModifyUser) { |
317 |
processModifyUser(event) |
318 |
} |
319 |
} |
320 |
|
321 |
/** |
322 |
* Try to load from the DB the latest known info (snapshot data) for the given user. |
323 |
*/ |
324 |
private[this] def findUserState(userId: String): Maybe[UserState] = { |
325 |
val usersDB = _configurator.storeProvider.userStateStore |
326 |
usersDB.findUserStateByUserId(userId) |
327 |
} |
328 |
|
329 |
/** |
330 |
* Tries to makes sure that the internal user state exists. |
331 |
* |
332 |
* May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that. |
333 |
* |
334 |
*/ |
335 |
private[this] def ensureUserState(): Unit = { |
336 |
/*if(null eq this._userState) { |
337 |
findUserState(this._userId) match { |
338 |
case Just(userState) ⇒ |
339 |
DEBUG("Loaded user state %s from DB", userState) |
340 |
//TODO: May be out of sync with the event store, rebuild it here |
341 |
this._userState = userState |
342 |
rebuildState(this._userState.oldestSnapshotTime) |
343 |
case Failed(e, m) ⇒ |
344 |
ERROR("While loading user state from DB: [%s][%s] %s", e.getClass.getName, e.getMessage, m) |
345 |
case NoVal ⇒ |
346 |
//TODO: Rebuild actor state here. |
347 |
rebuildState(0) |
348 |
WARN("Request for unknown (to Aquarium) user") |
349 |
} |
350 |
}*/ |
351 |
|
352 |
if (_userState == null) |
353 |
rebuildState(0) |
354 |
else |
355 |
rebuildState(_userState.oldestSnapshotTime, System.currentTimeMillis()) |
356 |
} |
357 |
|
358 |
/** |
359 |
* Replay the event log for all events that affect the user state, starting |
360 |
* from the provided time instant. |
361 |
*/ |
362 |
def rebuildState(from: Long): Unit = |
363 |
rebuildState(from, oneYearAhead(new Date(), new Date(Long.MaxValue)).getTime) |
364 |
|
365 |
/** |
366 |
* Replay the event log for all events that affect the user state. |
367 |
*/ |
368 |
def rebuildState(from: Long, to: Long): Unit = { |
369 |
val start = System.currentTimeMillis() |
370 |
if (_userState == null) |
371 |
createBlankState |
372 |
|
373 |
//Rebuild state from user events |
374 |
val usersDB = _configurator.storeProvider.userEventStore |
375 |
val userEvents = usersDB.findUserEventsByUserId(_userId) |
376 |
val numUserEvents = userEvents.size |
377 |
_userState = replayUserEvents(_userState, userEvents, from, to) |
378 |
|
379 |
//Rebuild state from resource events |
380 |
val eventsDB = _configurator.storeProvider.resourceEventStore |
381 |
val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from) |
382 |
val numResourceEvents = resourceEvents.size |
383 |
_userState = replayResourceEvents(_userState, resourceEvents, from, to) |
384 |
|
385 |
//Rebuild state from wallet entries |
386 |
val wallet = _configurator.storeProvider.walletEntryStore |
387 |
val walletEnties = wallet.findWalletEntriesAfter(_userId, new Date(from)) |
388 |
val numWalletEntries = walletEnties.size |
389 |
_userState = replayWalletEntries(_userState, walletEnties, from, to) |
390 |
|
391 |
INFO(("Rebuilt state from %d events (%d user events, " + |
392 |
"%d resource events, %d wallet entries) in %d msec").format( |
393 |
numUserEvents + numResourceEvents + numWalletEntries, |
394 |
numUserEvents, numResourceEvents, numWalletEntries, |
395 |
System.currentTimeMillis() - start)) |
396 |
} |
397 |
|
398 |
/** |
399 |
* Create an empty state for a user |
400 |
*/ |
401 |
def createBlankState = { |
402 |
val now = System.currentTimeMillis() |
403 |
val agreement = Policy.policy.findAgreement("default") |
404 |
|
405 |
this._userState = UserState( |
406 |
_userId, |
407 |
0, |
408 |
ActiveSuspendedSnapshot(false, now), |
409 |
CreditSnapshot(0, now), |
410 |
AgreementSnapshot(agreement.get.name, now), |
411 |
RolesSnapshot(List(), now), |
412 |
PaymentOrdersSnapshot(Nil, now), |
413 |
OwnedGroupsSnapshot(Nil, now), |
414 |
GroupMembershipsSnapshot(Nil, now), |
415 |
OwnedResourcesSnapshot(List(), now) |
416 |
) |
417 |
} |
418 |
|
419 |
/** |
420 |
* Replay user events on the provided user state |
421 |
*/ |
422 |
def replayUserEvents(initState: UserState, events: List[UserEvent], |
423 |
from: Long, to: Long): UserState = { |
424 |
var act = initState.active |
425 |
var rol = initState.roles |
426 |
events |
427 |
.filter(e => e.occurredMillis >= from && e.occurredMillis < to) |
428 |
.foreach { |
429 |
e => |
430 |
act = act.copy( |
431 |
data = e.isStateActive, snapshotTime = e.occurredMillis) |
432 |
// TODO: Implement the following |
433 |
//_userState.agreement = _userState.agreement.copy( |
434 |
// data = e.newAgreement, e.occurredMillis) |
435 |
|
436 |
rol = rol.copy(data = e.roles, |
437 |
snapshotTime = e.occurredMillis) |
438 |
} |
439 |
initState.copy(active = act, roles = rol) |
440 |
} |
441 |
|
442 |
/** |
443 |
* Replay resource events on the provided user state |
444 |
*/ |
445 |
def replayResourceEvents(initState: UserState, events: List[ResourceEvent], |
446 |
from: Long, to: Long): UserState = { |
447 |
var res = initState.ownedResources |
448 |
events |
449 |
.filter(e => e.occurredMillis >= from && e.occurredMillis < to) |
450 |
.foreach { |
451 |
e => |
452 |
val name = Policy.policy.findResource(e.resource) match { |
453 |
case Some(x) => x.name |
454 |
case None => "" |
455 |
} |
456 |
|
457 |
val instanceId = e.getInstanceId(Policy.policy) |
458 |
res = res.addOrUpdateResourceSnapshot(name, |
459 |
instanceId, e.value, e.occurredMillis)._1 |
460 |
} |
461 |
if (!events.isEmpty) { |
462 |
val snapTime = events.map{e => e.occurredMillis}.max |
463 |
res = res.copy(snapshotTime = snapTime) |
464 |
} |
465 |
initState.copy(ownedResources = res) |
466 |
} |
467 |
|
468 |
/** |
469 |
* Replay wallet entries on the provided user state |
470 |
*/ |
471 |
def replayWalletEntries(initState: UserState, events: List[WalletEntry], |
472 |
from: Long, to: Long): UserState = { |
473 |
var cred = initState.credits |
474 |
events |
475 |
.filter(e => e.occurredMillis >= from && e.occurredMillis < to) |
476 |
.foreach { |
477 |
w => |
478 |
val newVal = cred.data + w.value |
479 |
cred = cred.copy(data = newVal) |
480 |
} |
481 |
if (!events.isEmpty) { |
482 |
val snapTime = events.map{e => e.occurredMillis}.max |
483 |
cred = cred.copy(snapshotTime = snapTime) |
484 |
} |
485 |
initState.copy(credits = cred) |
486 |
} |
487 |
|
488 |
/** |
489 |
* Update wallet entries for all unprocessed events |
490 |
*/ |
491 |
def calcWalletEntries(): Unit = { |
492 |
ensureUserState |
493 |
|
494 |
if (_userState.ownedResources.snapshotTime < _userState.credits.snapshotTime) return |
495 |
val eventsDB = _configurator.storeProvider.resourceEventStore |
496 |
val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, _userState.credits.snapshotTime) |
497 |
val policy = Policy.policy |
498 |
|
499 |
val walletEntries = resourceEvents.map { |
500 |
ev => |
501 |
// TODO: Check that agreement exists |
502 |
val agr = policy.findAgreement(_userState.agreement.data).get |
503 |
|
504 |
val resource = policy.findResource(ev.resource) match { |
505 |
case Some(x) => x |
506 |
case None => |
507 |
val errMsg = "Cannot find resource: %s".format(ev.resource) |
508 |
ERROR(errMsg) |
509 |
throw new AccountingException(errMsg) // FIXME: to throw or not to throw? |
510 |
} |
511 |
|
512 |
val instid = resource.isComplex match { |
513 |
case true => ev.details.get(resource.asInstanceOf[DSLComplexResource].descriminatorField) |
514 |
case false => None |
515 |
} |
516 |
|
517 |
var curValue = 0F |
518 |
var lastUpdate = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)) match { |
519 |
case Some(x) => x.snapshotTime |
520 |
case None => Long.MaxValue //To trigger recalculation |
521 |
} |
522 |
|
523 |
if (lastUpdate > ev.occurredMillis) { |
524 |
//Event is older that current state. Rebuild state up to event timestamp |
525 |
val resHistory = |
526 |
ResourceEvent("", 0, 0, _userId, "1", ev.resource, ev.eventVersion, 0, ev.details) :: |
527 |
eventsDB.findResourceEventHistory(_userId, ev.resource, instid, ev.occurredMillis) |
528 |
INFO("%d older entries for resource %s, user %s up to %d".format(resHistory.size, ev.resource, _userId, ev.occurredMillis)); |
529 |
var res = OwnedResourcesSnapshot(List(), 0) |
530 |
resHistory.foreach { |
531 |
e => |
532 |
res = res.addOrUpdateResourceSnapshot(e.resource, e.getInstanceId(policy), e.value, e.occurredMillis)._1 |
533 |
} |
534 |
lastUpdate = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.snapshotTime |
535 |
curValue = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data |
536 |
} else { |
537 |
curValue = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data |
538 |
} |
539 |
|
540 |
val entries = chargeEvent(ev, agr, curValue, new Date(lastUpdate), |
541 |
findRelatedEntries(resource, ev.getInstanceId(policy))) |
542 |
INFO("PERF: CHARGE %s %d".format(ev.id, System.currentTimeMillis)) |
543 |
entries match { |
544 |
case Just(x) => x |
545 |
case Failed(e, r) => List() |
546 |
case NoVal => List() |
547 |
} |
548 |
}.flatten |
549 |
|
550 |
val walletDB = _configurator.storeProvider.walletEntryStore |
551 |
walletEntries.foreach(w => walletDB.storeWalletEntry(w)) |
552 |
|
553 |
ensureUserState |
554 |
} |
555 |
|
556 |
/** |
557 |
* Persist current user state |
558 |
*/ |
559 |
private[this] def saveUserState(): Unit = { |
560 |
_configurator.storeProvider.userStateStore.deleteUserState(this._userId) |
561 |
_configurator.storeProvider.userStateStore.storeUserState(this._userState) match { |
562 |
case Just(record) => record |
563 |
case NoVal => ERROR("Unknown error saving state") |
564 |
case Failed(e, a) => |
565 |
ERROR("Saving state failed: %s error was: %s".format(a,e)); |
566 |
} |
567 |
} |
568 |
|
569 |
protected def receive: Receive = { |
570 |
case m @ AquariumPropertiesLoaded(props) ⇒ |
571 |
this._timestampTheshold = props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000) |
572 |
INFO("Setup my timestampTheshold = %s", this._timestampTheshold) |
573 |
|
574 |
case m @ UserActorInitWithUserId(userId) ⇒ |
575 |
this._userId = userId |
576 |
DEBUG("Actor starting, loading state") |
577 |
ensureUserState() |
578 |
|
579 |
case m @ ProcessResourceEvent(resourceEvent) ⇒ |
580 |
if(resourceEvent.userId != this._userId) { |
581 |
ERROR("Received %s but my userId = %s".format(m, this._userId)) |
582 |
} else { |
583 |
//ensureUserState() |
584 |
calcWalletEntries() |
585 |
//processResourceEvent(resourceEvent, true) |
586 |
} |
587 |
|
588 |
case m @ ProcessUserEvent(userEvent) ⇒ |
589 |
if(userEvent.userId != this._userId) { |
590 |
ERROR("Received %s but my userId = %s".format(m, this._userId)) |
591 |
} else { |
592 |
ensureUserState() |
593 |
processUserEvent(userEvent) |
594 |
} |
595 |
|
596 |
case m @ RequestUserBalance(userId, timestamp) ⇒ |
597 |
/*if(this._userId != userId) { |
598 |
ERROR("Received %s but my userId = %s".format(m, this._userId)) |
599 |
// TODO: throw an exception here |
600 |
} else { |
601 |
// This is the big party. |
602 |
// Get the user state, if it exists and make sure it is not stale. |
603 |
ensureUserState() |
604 |
|
605 |
// Do we have a user state? |
606 |
if(_userState ne null) { |
607 |
// Yep, we do. See what there is inside it. |
608 |
val credits = _userState.credits |
609 |
val creditsTimestamp = credits.snapshotTime |
610 |
|
611 |
// Check if data is stale |
612 |
if(creditsTimestamp + _timestampTheshold > timestamp) { |
613 |
// No, it's OK |
614 |
self reply UserResponseGetBalance(userId, credits.data) |
615 |
} else { |
616 |
// Yep, data is stale and must recompute balance |
617 |
// FIXME: implement |
618 |
ERROR("FIXME: Should have computed a new value for %s".format(credits)) |
619 |
self reply UserResponseGetBalance(userId, credits.data) |
620 |
} |
621 |
} else { |
622 |
// No user state exists. This is an error. |
623 |
val errMsg = "Could not load user state for %s".format(m) |
624 |
ERROR(errMsg) |
625 |
self reply ResponseUserBalance(userId, 0, Some(errMsg)) |
626 |
}*/ |
627 |
if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000) |
628 |
calcWalletEntries() |
629 |
self reply UserResponseGetBalance(userId, _userState.credits.data) |
630 |
//} |
631 |
|
632 |
case m @ UserRequestGetState(userId, timestamp) ⇒ |
633 |
if(this._userId != userId) { |
634 |
ERROR("Received %s but my userId = %s".format(m, this._userId)) |
635 |
// TODO: throw an exception here |
636 |
} else { |
637 |
// FIXME: implement |
638 |
ERROR("FIXME: Should have properly computed the user state") |
639 |
ensureUserState() |
640 |
self reply UserResponseGetState(userId, this._userState) |
641 |
} |
642 |
} |
643 |
|
644 |
override def postStop { |
645 |
DEBUG("Stopping, saving state") |
646 |
//saveUserState |
647 |
} |
648 |
|
649 |
private[this] def DEBUG(fmt: String, args: Any*) = |
650 |
logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args:_*))) |
651 |
|
652 |
private[this] def INFO(fmt: String, args: Any*) = |
653 |
logger.info("UserActor[%s]: %s".format(_userId, fmt.format(args:_*))) |
654 |
|
655 |
private[this] def WARN(fmt: String, args: Any*) = |
656 |
logger.warn("UserActor[%s]: %s".format(_userId, fmt.format(args:_*))) |
657 |
|
658 |
private[this] def ERROR(fmt: String, args: Any*) = |
659 |
logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args:_*))) |
660 |
} |