root / src / main / scala / gr / grnet / aquarium / user / actor / UserActor.scala @ 6b4ecc99
History | View | Annotate | Download (31.2 kB)
1 |
/* |
---|---|
2 |
* Copyright 2011 GRNET S.A. All rights reserved. |
3 |
* |
4 |
* Redistribution and use in source and binary forms, with or |
5 |
* without modification, are permitted provided that the following |
6 |
* conditions are met: |
7 |
* |
8 |
* 1. Redistributions of source code must retain the above |
9 |
* copyright notice, this list of conditions and the following |
10 |
* disclaimer. |
11 |
* |
12 |
* 2. Redistributions in binary form must reproduce the above |
13 |
* copyright notice, this list of conditions and the following |
14 |
* disclaimer in the documentation and/or other materials |
15 |
* provided with the distribution. |
16 |
* |
17 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
18 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
19 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
20 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
21 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
22 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
23 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
24 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
25 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
26 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
27 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 |
* POSSIBILITY OF SUCH DAMAGE. |
29 |
* |
30 |
* The views and conclusions contained in the software and |
31 |
* documentation are those of the authors and should not be |
32 |
* interpreted as representing official policies, either expressed |
33 |
* or implied, of GRNET S.A. |
34 |
*/ |
35 |
|
36 |
package gr.grnet.aquarium.user.actor |
37 |
|
38 |
import gr.grnet.aquarium.actor._ |
39 |
import gr.grnet.aquarium.Configurator |
40 |
import gr.grnet.aquarium.processor.actor._ |
41 |
import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe} |
42 |
import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting} |
43 |
import gr.grnet.aquarium.user._ |
44 |
import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent} |
45 |
import gr.grnet.aquarium.logic.accounting.dsl.{DSLPolicy, DSLResource, DSLSimpleResource, DSLComplexResource} |
46 |
import java.util.Date |
47 |
import gr.grnet.aquarium.util.{DateUtils, TimeHelpers, Loggable} |
48 |
|
49 |
|
50 |
/** |
51 |
* |
52 |
* @author Christos KK Loverdos <loverdos@gmail.com> |
53 |
*/ |
54 |
|
55 |
class UserActor extends AquariumActor |
56 |
with Loggable with Accounting with DateUtils { |
57 |
@volatile |
58 |
private[this] var _userId: String = _ |
59 |
@volatile |
60 |
private[this] var _userState: UserState = _ |
61 |
@volatile |
62 |
private[this] var _timestampTheshold: Long = _ |
63 |
|
64 |
def role = UserActorRole |
65 |
|
66 |
private[this] def _configurator: Configurator = Configurator.MasterConfigurator |
67 |
|
68 |
private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = { |
69 |
if(checkForOlderEvents) { |
70 |
DEBUG("Checking for events older than %s" format resourceEvent) |
71 |
processOlderResourceEvents(resourceEvent) |
72 |
} |
73 |
|
74 |
justProcessTheResourceEvent(resourceEvent, "ACTUAL") |
75 |
} |
76 |
|
77 |
|
78 |
/** |
79 |
* Given an "onoff" event, we try to locate all unprocessed resource events that precede this one. |
80 |
*/ |
81 |
def findOlderResourceEventsForOnOff(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = { |
82 |
Nil |
83 |
} |
84 |
|
85 |
def findOlderResourceEventsForOther(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = { |
86 |
Nil |
87 |
} |
88 |
|
89 |
/** |
90 |
* Find resource events that precede the given one and are unprocessed. |
91 |
*/ |
92 |
private[this] def findOlderResourceEvents(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = { |
93 |
if(rcEvent.isOnOffEvent(policy)) { |
94 |
findOlderResourceEventsForOnOff(rcEvent, policy) |
95 |
} else { |
96 |
findOlderResourceEventsForOther(rcEvent, policy) |
97 |
} |
98 |
} |
99 |
|
100 |
/** |
101 |
* Terminology: |
102 |
* |
103 |
* - Credits |
104 |
* The analog of money. Credits are the `universal money` within Aquarium. |
105 |
* |
106 |
* - Resource |
107 |
* A billable/chargeable entity. We generally need credits to use a resource. When a resource is used, |
108 |
* then consume credits. Examples of resources are the `download bandwidth` and, respectively, |
109 |
* the 'upload bandwidth', the `disk space` and the `VM time` to name a few. |
110 |
* |
111 |
* - User |
112 |
* An owner of resources and credits. |
113 |
* |
114 |
* - Resource event |
115 |
* An event that is generated by a system, which is responsible for the resource and describes a |
116 |
* state change for the resource. In particular, a resource event records the time when that state |
117 |
* change occurred (`occurredMillis` attribute) and the changed value (`value` attribute). |
118 |
* |
119 |
* - Resource event store |
120 |
* A datatabase, in the general sense, where resource events are stored. |
121 |
* |
122 |
* - Cost policy |
123 |
* A cost policy refers to a resource and it is the policy used in order to charge credits for |
124 |
* resource usage. |
125 |
* |
126 |
* - "On/Off" cost policy |
127 |
* A particular cost policy for resource events that come in "on"/"off" pairs. |
128 |
* In this case the respective resource can only be (from the accounting/billing perspective) in two |
129 |
* states, namely "on" and "off". The respective resource events record the resource changes between |
130 |
* these two states. It is not the responsibility of Aquarium to guarantee correct interleaving of |
131 |
* "on"/"off" resource events. |
132 |
* |
133 |
* - "On/Off" resource event |
134 |
* A resource event for which the associated resource has an "On/Off" cost policy. |
135 |
* |
136 |
* - "On" resource event |
137 |
* An "on/off" event which actually records the "on" state for the particular resource it refers to. |
138 |
* |
139 |
* - "Off" resource event |
140 |
* An "on/off" event which actually records the "off" state for the particular resource it refers to. |
141 |
* |
142 |
* - Wallet entry |
143 |
* The entity that describes an accounting event and its corresponding credit value. |
144 |
* For example, downloading files uses the download bandwidth and this costs credits. The exact |
145 |
* cost, which is determined using the corresponding cost policy of the event, is recorded |
146 |
* in one or more wallet entries. |
147 |
* |
148 |
* - Finalized wallet entry |
149 |
* It is a wallet entry whose recorded credits (its credit value) can be taken into account for |
150 |
* billing the respective user (the resource owner). |
151 |
* When we say that a wallet entry is in a finalized state, we mean that it is a finalized wallet entry, as |
152 |
* described above. |
153 |
* |
154 |
* - Pending or Non-finalized wallet entry |
155 |
* It is a wallet entry whose recorded credits (probably having a zero value) cannot be taken into |
156 |
* account for billing the respective resource owner. Pending wallet entries are introduced to handle |
157 |
* the "on/off" resource events. |
158 |
* When we say that a wallet entry is in/having a pending state, we mean that it is in a pending or |
159 |
* non-finalized wallet entry, as described above. |
160 |
* |
161 |
* - Wallet store |
162 |
* A database, in the general sense, where wallet entries are stored. |
163 |
* |
164 |
* - User Bill |
165 |
* A, usually, periodic statement of how many credits the user has consumed. |
166 |
* It may contain detailed analysis that relates consumed credits to resources. |
167 |
* |
168 |
* - Billable or Chargeable wallet entry |
169 |
* A wallet entry that can participate in the creation of the user Bill. |
170 |
* A wallet entry is billable if and only if it is finalized. |
171 |
* |
172 |
* - Processed resource event |
173 |
* A resource event which has generated wallet entries. |
174 |
* We may also designate a processed resource event as `partially processed` in order to distinguish it |
175 |
* from a `fully processed` resource event. |
176 |
* |
177 |
* - Fully processed resource event |
178 |
* A resource event which has no pending wallet entries or has no pending wallet entries with corresponding new |
179 |
* and finalized wallet entries (the meaning of the `corresponding` wallet entries will become |
180 |
* obvious in action A2 below). |
181 |
* |
182 |
* |
183 |
* When an event is processed, there are three possible actions as an outcome, namely A1, A2, A3: |
184 |
* |
185 |
* - A1. Wallet entries are created. This happens *always*. Each resource event may create more than one |
186 |
* wallet entries. The credit value associated with each wallet entry may be zero credits but we |
187 |
* always generate wallet entries even those with zero credit values. |
188 |
* |
189 |
* We say that the event that causes the generation of wallet entries is the `generator` event. |
190 |
* |
191 |
* The semantics of wallet entry creation depend on whether the original event is an "on/off" event |
192 |
* or not. |
193 |
* |
194 |
* - If the event is an "on/off" event, then |
195 |
* - If it is an "on" event, then one and only one new wallet entry is created having a pending state. |
196 |
* The credit value of this wallet entry is zero and its `occurredMillis` attribute is the same |
197 |
* as the `occurredMillis` attribute of the generator event. |
198 |
* |
199 |
* So what we actually do in this case is to record the "on" state with a non-billable wallet entry. |
200 |
* ==> TODO: How is tis related to the (later) expecting "off" state and corresponding (to the "off" |
201 |
* TODO: state) wallet entries? |
202 |
* |
203 |
* - If it is an "off" event, then we need to find the corresponding "on" event. |
204 |
* As long as we have the corresponding "on" event, then based on the relative timespan between |
205 |
* the two "on/off" events and the corresponding resource policy we compute a series of *new* |
206 |
* and *finalized* wallet entries. Each new wallet entry has an `occurredMillis` attribute equal to the |
207 |
* `occurredMillis` attribute of the original, pending, wallet entry. We then say that the new |
208 |
* wallet entries _correspond_ to the original, pending wallet entry. |
209 |
* ==> TODO: verify that the semantics of `occurredMillis` for the new wallet entries, as defined |
210 |
* TODO: above, are correct. |
211 |
* |
212 |
* ==> TODO: What do we do with the old, pending wallet entry? Do we keep it or do we delete it? |
213 |
* |
214 |
* The new wallet entries are generated according to Algorithm W2. |
215 |
* |
216 |
* - If the event is not an "on/off" event then, by design, it can directly generate one or more |
217 |
* finalized wallet entries. These wallet entries are generated by Algorithm W1. |
218 |
* |
219 |
* - A2. As a byproduct of the above, the user credits total is updated. |
220 |
* |
221 |
* For this, we just use the chargeable wallet entries created from action A1. Non-chargeable wallet |
222 |
* entries are ignored completely. |
223 |
* |
224 |
* - A3. Relevant resource state (some value) is updated. For example, the total bandwidth up value is |
225 |
* increased by the amount. |
226 |
* |
227 |
* Since a resource event records the changed value for the resource, we can easily update the current |
228 |
* value for the resource. |
229 |
* |
230 |
* Under ideal circumstances, whenever a resource event arrives, we immediately process it and, so, the steps |
231 |
* described in the above actions are what it only takes to get to the new state. |
232 |
* |
233 |
* But it may so happen that the above event processing procedure may be interrupted. For example, an Aquarium |
234 |
* component or external dependency may fail. In such cases, when a resource event arrives we cannot safely assume |
235 |
* that it is the most recent unprocessed event. It is then that we have to take action. And since, in general |
236 |
* and from the perspective of a newly arriving resource event we are not certain if a failure has happened the |
237 |
* following assumption is valid: |
238 |
* |
239 |
* - For each arriving resource event, it is possible that there exist events that arrived previously but which |
240 |
* have not been partially or fully processed. |
241 |
* |
242 |
* So, on arrival of a new event, we need to search our event and wallet stores to find those unprocessed events |
243 |
* and process them in succession, as if they are newly arrived. |
244 |
* |
245 |
* In effect and in the most general case, we never process one event at a time but more than one resource |
246 |
* events. Their processing order is the ascending order of their `occurredMillis` attribute. |
247 |
* ==> TODO: Verify that this semantics of the processing order is correct. |
248 |
* |
249 |
* |
250 |
* We will need the following algorithms: |
251 |
* |
252 |
* Algorithm W1: Given an non "On/Off" resource event, generate its wallet entries. |
253 |
* |
254 |
* Algorithm W2: Given an "Off" resource event and the corresponding "On" resource event, generate |
255 |
* the corresponding wallet entries. |
256 |
* |
257 |
* Algorithm OnOff: Given an "Off" resource event, find its corresponding "On" resource event. |
258 |
* |
259 |
* Algorithm F: Given a newly arrived resource event, find the exact list of all unprocessed events up to the new |
260 |
* one. |
261 |
* Algorithm P: Process a resource event as if it is the most recent unprocessed one. |
262 |
* |
263 |
* The implementations are as follows: |
264 |
* |
265 |
* ============ |
266 |
* Algorithm W1 |
267 |
* ============ |
268 |
* - Input |
269 |
* - A non-"on/off" resource event |
270 |
* |
271 |
* - Output |
272 |
* - The respective wallet entries |
273 |
* |
274 |
* - Implementation |
275 |
* TODO: This is done in Accounting.chargeEvents |
276 |
* |
277 |
* ============ |
278 |
* Algorithm W2 |
279 |
* ============ |
280 |
* - Input |
281 |
* - An "off" resource event |
282 |
* - The corresponding "on" resource event |
283 |
* |
284 |
* - Output |
285 |
* - The respective wallet entries |
286 |
* |
287 |
* - Implementation |
288 |
* TODO: Trivial |
289 |
* |
290 |
* =============== |
291 |
* Algorithm OnOff |
292 |
* =============== |
293 |
* - Input |
294 |
* |
295 |
* =========== |
296 |
* Algorithm F |
297 |
* =========== |
298 |
* Input: A resource event (e). |
299 |
* Output: A list (l) of all unprocessed resource events up to (e). |
300 |
* |
301 |
* |
302 |
* |
303 |
*/ |
304 |
private[this] def thisIsJustForTheDoc: Unit = { |
305 |
|
306 |
} |
307 |
|
308 |
/** |
309 |
* Find and process older resource events. |
310 |
* |
311 |
* Older resource events are found based on the latest credit calculation, that is the latest |
312 |
* wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries |
313 |
* have been calculated for these resource events and start from there. |
314 |
*/ |
315 |
private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = { |
316 |
assert(_userId == resourceEvent.userId) |
317 |
val rceId = resourceEvent.id |
318 |
val userId = resourceEvent.userId |
319 |
val resourceEventStore = _configurator.resourceEventStore |
320 |
val walletEntriesStore = _configurator.walletStore |
321 |
|
322 |
// 1. Find latest wallet entry |
323 |
val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId) |
324 |
latestWalletEntriesM match { |
325 |
case Just(latestWalletEntries) ⇒ |
326 |
// The time on which we base the selection of the older events |
327 |
val selectionTime = latestWalletEntries.head.occurredMillis |
328 |
|
329 |
// 2. Now find all resource events past the time of the latest wallet entry. |
330 |
// These events have not been processed, except probably those ones |
331 |
// that have the same `occurredMillis` with `selectionTime` |
332 |
val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime) |
333 |
|
334 |
// 3. Filter out those resource events for which no wallet entry has actually been |
335 |
// produced. |
336 |
val rcEventsToProcess = for { |
337 |
oldRCEvent <- oldRCEvents |
338 |
oldRCEventId = oldRCEvent.id |
339 |
latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId) |
340 |
} yield { |
341 |
oldRCEvent |
342 |
} |
343 |
|
344 |
DEBUG("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent)) |
345 |
|
346 |
for { |
347 |
rcEventToProcess <- rcEventsToProcess |
348 |
} { |
349 |
justProcessTheResourceEvent(rcEventToProcess, "OLDER") |
350 |
} |
351 |
case NoVal ⇒ |
352 |
DEBUG("No events to process older than %s".format(resourceEvent)) |
353 |
case Failed(e, m) ⇒ |
354 |
ERROR("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage)) |
355 |
} |
356 |
} |
357 |
|
358 |
private[this] def _storeWalletEntries(walletEntries: List[WalletEntry], allowNonFinalized: Boolean): Unit = { |
359 |
val walletEntriesStore = _configurator.walletStore |
360 |
for(walletEntry <- walletEntries) { |
361 |
val allow = walletEntry.finalized || allowNonFinalized |
362 |
if(allow) { |
363 |
walletEntriesStore.storeWalletEntry(walletEntry) |
364 |
} |
365 |
} |
366 |
} |
367 |
|
368 |
private[this] def _calcNewCreditSum(walletEntries: List[WalletEntry]): Double = { |
369 |
val newCredits = for { |
370 |
walletEntry <- walletEntries if(walletEntry.finalized) |
371 |
} yield walletEntry.value.toDouble |
372 |
|
373 |
newCredits.sum |
374 |
} |
375 |
|
376 |
/** |
377 |
* Process the resource event as if nothing else matters. Just do it. |
378 |
*/ |
379 |
private[this] def justProcessTheResourceEvent(ev: ResourceEvent, logLabel: String): Unit = { |
380 |
val start = System.currentTimeMillis |
381 |
DEBUG("Processing [%s] %s".format(logLabel, ev)) |
382 |
|
383 |
// Initially, the user state (regarding resources) is empty. |
384 |
// So we have to compensate for both a totally empty resource state |
385 |
// and the update with new values. |
386 |
|
387 |
// 1. Find the resource definition |
388 |
val policy = Policy.policy |
389 |
policy.findResource(ev.resource) match { |
390 |
case Some(resource) ⇒ |
391 |
// 2. Get the instance id and value for the resource |
392 |
val instanceIdM = resource match { |
393 |
// 2.1 If it is complex, from the details map, get the field which holds the instanceId |
394 |
case DSLComplexResource(name, unit, costPolicy, descriminatorField) ⇒ |
395 |
ev.details.get(descriminatorField) match { |
396 |
case Some(instanceId) ⇒ |
397 |
Just(instanceId) |
398 |
case None ⇒ |
399 |
// We should have some value under this key here.... |
400 |
Failed(throw new AccountingException("")) //TODO: See what to do here |
401 |
} |
402 |
// 2.2 If it is simple, ... |
403 |
case DSLSimpleResource(name, unit, costPolicy) ⇒ |
404 |
// ... by convention, the instanceId of a simple resource is just "1" |
405 |
// @see [[gr.grnet.aquarium.user.OwnedResourcesSnapshot]] |
406 |
Just("1") |
407 |
} |
408 |
|
409 |
// 3. Did we get a valid instanceId? |
410 |
instanceIdM match { |
411 |
// 3.1 Yes, time to get/update the current state |
412 |
case Just(instanceId) ⇒ |
413 |
val oldOwnedResources = _userState.ownedResources |
414 |
|
415 |
// A. First state diff: the modified resource value |
416 |
val StateChangeMillis = TimeHelpers.nowMillis |
417 |
val (newOwnedResources, oldRCInstanceOpt, newRCInstance) = oldOwnedResources. |
418 |
addOrUpdateResourceSnapshot(resource.name, instanceId, ev.value, ev.occurredMillis) |
419 |
val previousRCUpdateTime = oldRCInstanceOpt.map(_.snapshotTime).getOrElse(newRCInstance.snapshotTime) |
420 |
|
421 |
// Calculate the wallet entries generated from this resource event |
422 |
_userState.maybeDSLAgreement match { |
423 |
case Just(agreement) ⇒ |
424 |
val walletEntriesM = chargeEvent(ev, agreement, ev.value, |
425 |
new Date(previousRCUpdateTime), |
426 |
findRelatedEntries(resource, ev.getInstanceId(policy))) |
427 |
walletEntriesM match { |
428 |
case Just(walletEntries) ⇒ |
429 |
_storeWalletEntries(walletEntries, true) |
430 |
|
431 |
// B. Second state diff: the new credits |
432 |
val newCreditsSum = _calcNewCreditSum(walletEntries) |
433 |
val oldCredits = _userState.safeCredits.data |
434 |
val newCredits = CreditSnapshot(oldCredits + newCreditsSum, StateChangeMillis) |
435 |
|
436 |
// Finally, the userState change |
437 |
DEBUG("Credits = %s".format(this._userId, newCredits)) |
438 |
DEBUG("Resources = %s".format(this._userId, newOwnedResources)) |
439 |
this._userState = this._userState.copy( |
440 |
credits = newCredits, |
441 |
ownedResources = newOwnedResources |
442 |
) |
443 |
case NoVal ⇒ |
444 |
DEBUG("No wallet entries generated for %s".format(ev)) |
445 |
case failed @ Failed(e, m) ⇒ |
446 |
failed |
447 |
} |
448 |
|
449 |
case NoVal ⇒ |
450 |
Failed(new UserDataSnapshotException("No agreement snapshot found for user %s".format(this._userId))) |
451 |
case failed @ Failed(e, m) ⇒ |
452 |
failed |
453 |
} |
454 |
// 3.2 No, no luck, this is an error |
455 |
case NoVal ⇒ |
456 |
Failed(new UserDataSnapshotException("No instanceId for resource %s of user %s".format(resource, this._userId))) |
457 |
case failed @ Failed(e, m) ⇒ |
458 |
failed |
459 |
} |
460 |
// No resource definition found, this is an error |
461 |
case None ⇒ // Policy.policy.findResource(ev.resource) |
462 |
Failed(new UserDataSnapshotException("No resource %s found for user %s".format(ev.resource, this._userId))) |
463 |
} |
464 |
|
465 |
DEBUG("Finished %s time: %d ms".format(ev.id, System.currentTimeMillis - start)) |
466 |
} |
467 |
|
468 |
private[this] def processCreateUser(event: UserEvent): Unit = { |
469 |
val userId = event.userId |
470 |
DEBUG("Creating user from state %s", event) |
471 |
val usersDB = _configurator.storeProvider.userStateStore |
472 |
usersDB.findUserStateByUserId(userId) match { |
473 |
case Just(userState) ⇒ |
474 |
WARN("User already created, state = %s".format(userState)) |
475 |
case failed @ Failed(e, m) ⇒ |
476 |
ERROR("[%s][%s] %s", e.getClass.getName, e.getMessage, m) |
477 |
case NoVal ⇒ |
478 |
// OK. Create a default UserState and store it |
479 |
val now = TimeHelpers.nowMillis |
480 |
val agreementOpt = Policy.policy.findAgreement("default") |
481 |
|
482 |
if(agreementOpt.isEmpty) { |
483 |
ERROR("No default agreement found. Cannot initialize user state") |
484 |
} else { |
485 |
this._userState = UserState( |
486 |
userId, |
487 |
ActiveSuspendedSnapshot(event.isStateActive, now), |
488 |
CreditSnapshot(0, now), |
489 |
AgreementSnapshot(agreementOpt.get.name, now), |
490 |
RolesSnapshot(event.roles, now), |
491 |
PaymentOrdersSnapshot(Nil, now), |
492 |
OwnedGroupsSnapshot(Nil, now), |
493 |
GroupMembershipsSnapshot(Nil, now), |
494 |
OwnedResourcesSnapshot(List(), now) |
495 |
) |
496 |
|
497 |
saveUserState |
498 |
DEBUG("Created and stored %s", this._userState) |
499 |
} |
500 |
} |
501 |
} |
502 |
|
503 |
private[this] def findRelatedEntries(res: DSLResource, instid: String): List[WalletEntry] = { |
504 |
val walletDB = _configurator.storeProvider.walletEntryStore |
505 |
walletDB.findPreviousEntry(_userId, res.name, instid, Some(false)) |
506 |
} |
507 |
|
508 |
|
509 |
private[this] def processModifyUser(event: UserEvent): Unit = { |
510 |
val now = TimeHelpers.nowMillis |
511 |
val newActive = ActiveSuspendedSnapshot(event.isStateActive, now) |
512 |
|
513 |
DEBUG("New active status = %s".format(newActive)) |
514 |
|
515 |
this._userState = this._userState.copy( active = newActive ) |
516 |
} |
517 |
/** |
518 |
* Use the provided [[gr.grnet.aquarium.logic.events.UserEvent]] to change any user state. |
519 |
*/ |
520 |
private[this] def processUserEvent(event: UserEvent): Unit = { |
521 |
if(event.isCreateUser) { |
522 |
processCreateUser(event) |
523 |
} else if(event.isModifyUser) { |
524 |
processModifyUser(event) |
525 |
} |
526 |
} |
527 |
|
528 |
/** |
529 |
* Try to load from the DB the latest known info (snapshot data) for the given user. |
530 |
*/ |
531 |
private[this] def findUserState(userId: String): Maybe[UserState] = { |
532 |
val usersDB = _configurator.storeProvider.userStateStore |
533 |
usersDB.findUserStateByUserId(userId) |
534 |
} |
535 |
|
536 |
/** |
537 |
* Tries to makes sure that the internal user state exists. |
538 |
* |
539 |
* May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that. |
540 |
* |
541 |
*/ |
542 |
private[this] def ensureUserState(): Unit = { |
543 |
if(null eq this._userState) { |
544 |
findUserState(this._userId) match { |
545 |
case Just(userState) ⇒ |
546 |
DEBUG("Loaded user state %s from DB", userState) |
547 |
//TODO: May be out of sync with the event store, rebuild it here |
548 |
this._userState = userState |
549 |
rebuildState(this._userState.oldestSnapshotTime) |
550 |
case Failed(e, m) ⇒ |
551 |
ERROR("While loading user state from DB: [%s][%s] %s", e.getClass.getName, e.getMessage, m) |
552 |
case NoVal ⇒ |
553 |
//TODO: Rebuild actor state here. |
554 |
rebuildState(0) |
555 |
WARN("Request for unknown (to Aquarium) user") |
556 |
} |
557 |
} |
558 |
} |
559 |
|
560 |
/** |
561 |
* Replay the event log for all events that affect the user state, starting |
562 |
* from the provided time instant. |
563 |
*/ |
564 |
def rebuildState(from: Long): Unit = |
565 |
rebuildState(from, oneYearAhead(new Date(), new Date(Long.MaxValue)).getTime) |
566 |
|
567 |
/** |
568 |
* Replay the event log for all events that affect the user state. |
569 |
*/ |
570 |
def rebuildState(from: Long, to: Long): Unit = { |
571 |
val start = System.currentTimeMillis() |
572 |
if (_userState == null) |
573 |
createBlankState |
574 |
|
575 |
//Rebuild state from user events |
576 |
val usersDB = _configurator.storeProvider.userEventStore |
577 |
val userEvents = usersDB.findUserEventsByUserId(_userId) |
578 |
val numUserEvents = userEvents.size |
579 |
_userState = replayUserEvents(_userState, userEvents, from, to) |
580 |
|
581 |
//Rebuild state from resource events |
582 |
val eventsDB = _configurator.storeProvider.resourceEventStore |
583 |
val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from) |
584 |
val numResourceEvents = resourceEvents.size |
585 |
_userState = replayResourceEvents(_userState, resourceEvents, from, to) |
586 |
|
587 |
//Rebuild state from wallet entries |
588 |
val wallet = _configurator.storeProvider.walletEntryStore |
589 |
val walletEnties = wallet.findUserWalletEntriesFromTo(_userId, new Date(from), new Date(to)) |
590 |
val numWalletEntries = walletEnties.size |
591 |
_userState = replayResourceEvents(_userState, resourceEvents, from, to) |
592 |
|
593 |
INFO(("Rebuilt state from %d events (%d user events, " + |
594 |
"%d resource events, %d wallet entries) in %d msec").format( |
595 |
numUserEvents + numResourceEvents + numWalletEntries, |
596 |
numUserEvents, numResourceEvents, numWalletEntries, |
597 |
System.currentTimeMillis() - start)) |
598 |
} |
599 |
|
600 |
/** |
601 |
* Create an empty state for a user |
602 |
*/ |
603 |
def createBlankState = { |
604 |
val now = 0 |
605 |
val agreement = Policy.policy.findAgreement("default") |
606 |
|
607 |
this._userState = UserState( |
608 |
_userId, |
609 |
ActiveSuspendedSnapshot(false, now), |
610 |
CreditSnapshot(0, now), |
611 |
AgreementSnapshot(agreement.get.name, now), |
612 |
RolesSnapshot(List(), now), |
613 |
PaymentOrdersSnapshot(Nil, now), |
614 |
OwnedGroupsSnapshot(Nil, now), |
615 |
GroupMembershipsSnapshot(Nil, now), |
616 |
OwnedResourcesSnapshot(List(), now) |
617 |
) |
618 |
} |
619 |
|
620 |
/** |
621 |
* Replay user events on the provided user state |
622 |
*/ |
623 |
def replayUserEvents(initState: UserState, events: List[UserEvent], |
624 |
from: Long, to: Long): UserState = { |
625 |
var act = initState.active |
626 |
var rol = initState.roles |
627 |
events |
628 |
.filter(e => e.occurredMillis >= from && e.occurredMillis < to) |
629 |
.foreach { |
630 |
e => |
631 |
act = act.copy( |
632 |
data = e.isStateActive, snapshotTime = e.occurredMillis) |
633 |
// TODO: Implement the following |
634 |
//_userState.agreement = _userState.agreement.copy( |
635 |
// data = e.newAgreement, e.occurredMillis) |
636 |
|
637 |
rol = rol.copy(data = e.roles, |
638 |
snapshotTime = e.occurredMillis) |
639 |
} |
640 |
initState.copy(active = act, roles = rol) |
641 |
} |
642 |
|
643 |
/** |
644 |
* Replay resource events on the provided user state |
645 |
*/ |
646 |
def replayResourceEvents(initState: UserState, events: List[ResourceEvent], |
647 |
from: Long, to: Long): UserState = { |
648 |
var res = initState.ownedResources |
649 |
events |
650 |
.filter(e => e.occurredMillis >= from && e.occurredMillis < to) |
651 |
.foreach { |
652 |
e => |
653 |
val name = Policy.policy.findResource(e.resource) match { |
654 |
case Some(x) => x.name |
655 |
case None => "" |
656 |
} |
657 |
|
658 |
val instanceId = e.getInstanceId(Policy.policy) |
659 |
res = res.addOrUpdateResourceSnapshot(name, |
660 |
instanceId, e.value, e.occurredMillis)._1 |
661 |
} |
662 |
initState.copy(ownedResources = res) |
663 |
} |
664 |
|
665 |
/** |
666 |
* Replay wallet entries on the provided user state |
667 |
*/ |
668 |
def replayWalletEntries(initState: UserState, events: List[WalletEntry], |
669 |
from: Long, to: Long): UserState = { |
670 |
var cred = initState.credits |
671 |
events |
672 |
.filter(e => e.occurredMillis >= from && e.occurredMillis < to) |
673 |
.foreach { |
674 |
w => cred = cred.copy(data = w.value, snapshotTime = w.occurredMillis) |
675 |
} |
676 |
initState.copy(credits = cred) |
677 |
} |
678 |
|
679 |
/** |
680 |
* Persist current user state |
681 |
*/ |
682 |
private[this] def saveUserState(): Unit = { |
683 |
_configurator.storeProvider.userStateStore.deleteUserState(this._userId) |
684 |
_configurator.storeProvider.userStateStore.storeUserState(this._userState) match { |
685 |
case Just(record) => record |
686 |
case NoVal => ERROR("Unknown error saving state") |
687 |
case Failed(e, a) => |
688 |
ERROR("Saving state failed: %s error was: %s".format(a,e)); |
689 |
} |
690 |
} |
691 |
|
692 |
protected def receive: Receive = { |
693 |
case m @ AquariumPropertiesLoaded(props) ⇒ |
694 |
this._timestampTheshold = props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000) |
695 |
INFO("Setup my timestampTheshold = %s", this._timestampTheshold) |
696 |
|
697 |
case m @ UserActorInitWithUserId(userId) ⇒ |
698 |
this._userId = userId |
699 |
DEBUG("Actor starting, loading state") |
700 |
ensureUserState() |
701 |
|
702 |
case m @ ProcessResourceEvent(resourceEvent) ⇒ |
703 |
if(resourceEvent.userId != this._userId) { |
704 |
ERROR("Received %s but my userId = %s".format(m, this._userId)) |
705 |
} else { |
706 |
ensureUserState() |
707 |
processResourceEvent(resourceEvent, true) |
708 |
} |
709 |
|
710 |
case m @ ProcessUserEvent(userEvent) ⇒ |
711 |
if(userEvent.userId != this._userId) { |
712 |
ERROR("Received %s but my userId = %s".format(m, this._userId)) |
713 |
} else { |
714 |
ensureUserState() |
715 |
processUserEvent(userEvent) |
716 |
} |
717 |
|
718 |
case m @ RequestUserBalance(userId, timestamp) ⇒ |
719 |
if(this._userId != userId) { |
720 |
ERROR("Received %s but my userId = %s".format(m, this._userId)) |
721 |
// TODO: throw an exception here |
722 |
} else { |
723 |
// This is the big party. |
724 |
// Get the user state, if it exists and make sure it is not stale. |
725 |
ensureUserState() |
726 |
|
727 |
// Do we have a user state? |
728 |
if(_userState ne null) { |
729 |
// Yep, we do. See what there is inside it. |
730 |
val credits = _userState.credits |
731 |
val creditsTimestamp = credits.snapshotTime |
732 |
|
733 |
// Check if data is stale |
734 |
if(creditsTimestamp + _timestampTheshold > timestamp) { |
735 |
// No, it's OK |
736 |
self reply UserResponseGetBalance(userId, credits.data) |
737 |
} else { |
738 |
// Yep, data is stale and must recompute balance |
739 |
// FIXME: implement |
740 |
ERROR("FIXME: Should have computed a new value for %s".format(credits)) |
741 |
self reply UserResponseGetBalance(userId, credits.data) |
742 |
} |
743 |
} else { |
744 |
// No user state exists. This is an error. |
745 |
val errMsg = "Could not load user state for %s".format(m) |
746 |
ERROR(errMsg) |
747 |
self reply ResponseUserBalance(userId, 0, Some(errMsg)) |
748 |
} |
749 |
} |
750 |
|
751 |
case m @ UserRequestGetState(userId, timestamp) ⇒ |
752 |
if(this._userId != userId) { |
753 |
ERROR("Received %s but my userId = %s".format(m, this._userId)) |
754 |
// TODO: throw an exception here |
755 |
} else { |
756 |
// FIXME: implement |
757 |
ERROR("FIXME: Should have properly computed the user state") |
758 |
ensureUserState() |
759 |
self reply UserResponseGetState(userId, this._userState) |
760 |
} |
761 |
} |
762 |
|
763 |
override def postStop { |
764 |
DEBUG("Stopping, saving state") |
765 |
saveUserState |
766 |
} |
767 |
|
768 |
private[this] def DEBUG(fmt: String, args: Any*) = |
769 |
logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args:_*))) |
770 |
|
771 |
private[this] def INFO(fmt: String, args: Any*) = |
772 |
logger.info("UserActor[%s]: %s".format(_userId, fmt.format(args:_*))) |
773 |
|
774 |
private[this] def WARN(fmt: String, args: Any*) = |
775 |
logger.warn("UserActor[%s]: %s".format(_userId, fmt.format(args:_*))) |
776 |
|
777 |
private[this] def ERROR(fmt: String, args: Any*) = |
778 |
logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args:_*))) |
779 |
} |