Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / user / actor / UserActor.scala @ 6b4ecc99

History | View | Annotate | Download (31.2 kB)

1
/*
2
 * Copyright 2011 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium.user.actor
37

    
38
import gr.grnet.aquarium.actor._
39
import gr.grnet.aquarium.Configurator
40
import gr.grnet.aquarium.processor.actor._
41
import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
42
import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting}
43
import gr.grnet.aquarium.user._
44
import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent}
45
import gr.grnet.aquarium.logic.accounting.dsl.{DSLPolicy, DSLResource, DSLSimpleResource, DSLComplexResource}
46
import java.util.Date
47
import gr.grnet.aquarium.util.{DateUtils, TimeHelpers, Loggable}
48

    
49

    
50
/**
51
 *
52
 * @author Christos KK Loverdos <loverdos@gmail.com>
53
 */
54

    
55
class UserActor extends AquariumActor
56
  with Loggable with Accounting with DateUtils {
57
  @volatile
58
  private[this] var _userId: String = _
59
  @volatile
60
  private[this] var _userState: UserState = _
61
  @volatile
62
  private[this] var _timestampTheshold: Long = _
63

    
64
  def role = UserActorRole
65

    
66
  private[this] def _configurator: Configurator = Configurator.MasterConfigurator
67

    
68
  private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = {
69
    if(checkForOlderEvents) {
70
      DEBUG("Checking for events older than %s" format resourceEvent)
71
      processOlderResourceEvents(resourceEvent)
72
    }
73

    
74
    justProcessTheResourceEvent(resourceEvent, "ACTUAL")
75
  }
76

    
77

    
78
  /**
79
   * Given an "onoff" event, we try to locate all unprocessed resource events that precede this one.
80
   */
81
  def findOlderResourceEventsForOnOff(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
82
    Nil
83
  }
84

    
85
  def findOlderResourceEventsForOther(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
86
    Nil
87
  }
88

    
89
  /**
90
   * Find resource events that precede the given one and are unprocessed.
91
   */
92
  private[this] def findOlderResourceEvents(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
93
    if(rcEvent.isOnOffEvent(policy)) {
94
      findOlderResourceEventsForOnOff(rcEvent, policy)
95
    } else {
96
      findOlderResourceEventsForOther(rcEvent, policy)
97
    }
98
  }
99

    
100
  /**
101
   * Terminology:
102
   *
103
   * - Credits
104
   *   The analog of money. Credits are the `universal money` within Aquarium.
105
   *
106
   * - Resource
107
   *   A billable/chargeable entity. We generally need credits to use a resource. When a resource is used,
108
   *   then consume credits. Examples of resources are the `download bandwidth` and, respectively,
109
   *   the 'upload bandwidth', the `disk space` and the `VM time` to name a few.
110
   *
111
   * - User
112
   *   An owner of resources and credits.
113
   *
114
   * - Resource event
115
   *   An event that is generated by a system, which is responsible for the resource and describes a
116
   *   state change for the resource. In particular, a resource event records the time when that state
117
   *   change occurred (`occurredMillis` attribute) and the changed value (`value` attribute).
118
   *
119
   * - Resource event store
120
   *   A datatabase, in the general sense, where resource events are stored.
121
   *
122
   * - Cost policy
123
   *   A cost policy refers to a resource and it is the policy used in order to charge credits for
124
   *   resource usage.
125
   *
126
   * - "On/Off" cost policy
127
   *   A particular cost policy for resource events that come in "on"/"off" pairs.
128
   *   In this case the respective resource can only be (from the accounting/billing perspective) in two
129
   *   states, namely "on" and "off". The respective resource events record the resource changes between
130
   *   these two states. It is not the responsibility of Aquarium to guarantee correct interleaving of
131
   *   "on"/"off" resource events.
132
   *
133
   * - "On/Off" resource event
134
   *   A resource event for which the associated resource has an "On/Off" cost policy.
135
   *
136
   * - "On" resource event
137
   *   An "on/off" event which actually records the "on" state for the particular resource it refers to.
138
   *
139
   * - "Off" resource event
140
   *   An "on/off" event which actually records the "off" state for the particular resource it refers to.
141
   *
142
   * - Wallet entry
143
   *   The entity that describes an accounting event and its corresponding credit value.
144
   *   For example, downloading files uses the download bandwidth and this costs credits. The exact
145
   *   cost, which is determined using the corresponding cost policy of the event, is recorded
146
   *   in one or more wallet entries.
147
   *
148
   * - Finalized wallet entry
149
   *   It is a wallet entry whose recorded credits (its credit value) can be taken into account for
150
   *   billing the respective user (the resource owner).
151
   *   When we say that a wallet entry is in a finalized state, we mean that it is a finalized wallet entry, as
152
   *   described above.
153
   *
154
   * - Pending or Non-finalized wallet entry
155
   *   It is a wallet entry whose recorded credits (probably having a zero value) cannot be taken into
156
   *   account for billing the respective resource owner. Pending wallet entries are introduced to handle
157
   *   the "on/off" resource events.
158
   *   When we say that a wallet entry is in/having a pending state, we mean that it is in a pending or
159
   *   non-finalized wallet entry, as described above.
160
   *
161
   * - Wallet store
162
   *   A database, in the general sense, where wallet entries are stored.
163
   *
164
   * - User Bill
165
   *   A, usually, periodic statement of how many credits the user has consumed.
166
   *   It may contain detailed analysis that relates consumed credits to resources.
167
   *
168
   * - Billable or Chargeable wallet entry
169
   *   A wallet entry that can participate in the creation of the user Bill.
170
   *   A wallet entry is billable if and only if it is finalized.
171
   *
172
   * - Processed resource event
173
   *   A resource event which has generated wallet entries.
174
   *   We may also designate a processed resource event as `partially processed` in order to distinguish it
175
   *   from a `fully processed` resource event.
176
   *
177
   * - Fully processed resource event
178
   *   A resource event which has no pending wallet entries or has no pending wallet entries with corresponding new
179
   *   and finalized wallet entries (the meaning of the `corresponding` wallet entries will become
180
   *   obvious in action A2 below).
181
   *
182
   *
183
   * When an event is processed, there are three possible actions as an outcome, namely A1, A2, A3:
184
   *
185
   * - A1. Wallet entries are created. This happens *always*. Each resource event may create more than one
186
   *       wallet entries. The credit value associated with each wallet entry may be zero credits but we
187
   *       always generate wallet entries even those with zero credit values.
188
   *
189
   *       We say that the event that causes the generation of wallet entries is the `generator` event.
190
   *
191
   *       The semantics of wallet entry creation depend on whether the original event is an "on/off" event
192
   *       or not.
193
   *
194
   *       - If the event is an "on/off" event, then
195
   *         - If it is an "on" event, then one and only one new wallet entry is created having a pending state.
196
   *           The credit value of this wallet entry is zero and its `occurredMillis` attribute is the same
197
   *           as the `occurredMillis` attribute of the generator event.
198
   *
199
   *           So what we actually do in this case is to record the "on" state with a non-billable wallet entry.
200
   *           ==> TODO: How is tis related to the (later) expecting "off" state and corresponding (to the "off"
201
   *               TODO: state) wallet entries?
202
   *
203
   *         - If it is an "off" event, then we need to find the corresponding "on" event.
204
   *           As long as we have the corresponding "on" event, then based on the relative timespan between
205
   *           the two "on/off" events and the corresponding resource policy we compute a series of *new*
206
   *           and *finalized* wallet entries. Each new wallet entry has an `occurredMillis` attribute equal to the
207
   *           `occurredMillis` attribute of the original, pending, wallet entry. We then say that the new
208
   *           wallet entries _correspond_ to the original, pending wallet entry.
209
   *           ==> TODO: verify that the semantics of `occurredMillis` for the new wallet entries, as defined
210
   *               TODO: above, are correct.
211
   *
212
   *           ==> TODO: What do we do with the old, pending wallet entry? Do we keep it or do we delete it?
213
   *
214
   *           The new wallet entries are generated according to Algorithm W2.
215
   *
216
   *       - If the event is not an "on/off" event then, by design, it can directly generate one or more
217
   *         finalized wallet entries. These wallet entries are generated by Algorithm W1.
218
   *
219
   * - A2. As a byproduct of the above, the user credits total is updated.
220
   *
221
   *       For this, we just use the chargeable wallet entries created from action A1. Non-chargeable wallet
222
   *       entries are ignored completely.
223
   *
224
   * - A3. Relevant resource state (some value) is updated. For example, the total bandwidth up value is
225
   *       increased by the amount.
226
   *
227
   *       Since a resource event records the changed value for the resource, we can easily update the current
228
   *       value for the resource.
229
   *
230
   * Under ideal circumstances, whenever a resource event arrives, we immediately process it and, so, the steps
231
   * described in the above actions are what it only takes to get to the new state.
232
   *
233
   * But it may so happen that the above event processing procedure may be interrupted. For example, an Aquarium
234
   * component or external dependency may fail. In such cases, when a resource event arrives we cannot safely assume
235
   * that it is the most recent unprocessed event. It is then that we have to take action. And since, in general
236
   * and from the perspective of a newly arriving resource event we are not certain if a failure has happened the
237
   * following assumption is valid:
238
   *
239
   * - For each arriving resource event, it is possible that there exist events that arrived previously but which
240
   *   have not been partially or fully processed.
241
   *
242
   * So, on arrival of a new event, we need to search our event and wallet stores to find those unprocessed events
243
   * and process them in succession, as if they are newly arrived.
244
   *
245
   * In effect and in the most general case, we never process one event at a time but more than one resource
246
   * events. Their processing order is the ascending order of their `occurredMillis` attribute.
247
   * ==> TODO: Verify that this semantics of the processing order is correct.
248
   *
249
   *
250
   * We will need the following algorithms:
251
   *
252
   * Algorithm W1: Given an non "On/Off" resource event, generate its wallet entries.
253
   *
254
   * Algorithm W2: Given an "Off" resource event and the corresponding "On" resource event, generate
255
   *               the corresponding wallet entries.
256
   *
257
   * Algorithm OnOff: Given an "Off" resource event, find its corresponding "On" resource event.
258
   *
259
   * Algorithm F: Given a newly arrived resource event, find the exact list of all unprocessed events up to the new
260
   *              one.
261
   * Algorithm P: Process a resource event as if it is the most recent unprocessed one.
262
   *
263
   * The implementations are as follows:
264
   *
265
   * ============
266
   * Algorithm W1
267
   * ============
268
   * - Input
269
   *   - A non-"on/off" resource event
270
   *
271
   * - Output
272
   *   - The respective wallet entries
273
   *
274
   * - Implementation
275
   *   TODO: This is done in Accounting.chargeEvents
276
   *
277
   * ============
278
   * Algorithm W2
279
   * ============
280
   * - Input
281
   *   - An "off" resource event
282
   *   - The corresponding "on" resource event
283
   *
284
   * - Output
285
   *   - The respective wallet entries
286
   *
287
   * - Implementation
288
   *   TODO: Trivial
289
   *
290
   * ===============
291
   * Algorithm OnOff
292
   * ===============
293
   * - Input
294
   *
295
   * ===========
296
   * Algorithm F
297
   * ===========
298
   * Input:  A resource event (e).
299
   * Output: A list (l) of all unprocessed resource events up to (e).
300
   *
301
   *
302
   *
303
   */
304
  private[this] def thisIsJustForTheDoc: Unit = {
305

    
306
  }
307

    
308
  /**
309
   * Find and process older resource events.
310
   *
311
   * Older resource events are found based on the latest credit calculation, that is the latest
312
   * wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries
313
   * have been calculated for these resource events and start from there.
314
   */
315
  private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = {
316
    assert(_userId == resourceEvent.userId)
317
    val rceId = resourceEvent.id
318
    val userId = resourceEvent.userId
319
    val resourceEventStore = _configurator.resourceEventStore
320
    val walletEntriesStore = _configurator.walletStore
321

    
322
    // 1. Find latest wallet entry
323
    val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId)
324
    latestWalletEntriesM match {
325
      case Just(latestWalletEntries) ⇒
326
        // The time on which we base the selection of the older events
327
        val selectionTime = latestWalletEntries.head.occurredMillis
328

    
329
        // 2. Now find all resource events past the time of the latest wallet entry.
330
        //    These events have not been processed, except probably those ones
331
        //    that have the same `occurredMillis` with `selectionTime`
332
        val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime)
333

    
334
        // 3. Filter out those resource events for which no wallet entry has actually been
335
        //    produced.
336
        val rcEventsToProcess = for {
337
          oldRCEvent        <- oldRCEvents
338
          oldRCEventId      = oldRCEvent.id
339
          latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId)
340
        } yield {
341
          oldRCEvent
342
        }
343

    
344
        DEBUG("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent))
345

    
346
        for {
347
          rcEventToProcess <- rcEventsToProcess
348
        } {
349
          justProcessTheResourceEvent(rcEventToProcess, "OLDER")
350
        }
351
      case NoVal ⇒
352
        DEBUG("No events to process older than %s".format(resourceEvent))
353
      case Failed(e, m) ⇒
354
        ERROR("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage))
355
    }
356
  }
357

    
358
  private[this] def _storeWalletEntries(walletEntries: List[WalletEntry], allowNonFinalized: Boolean): Unit = {
359
    val walletEntriesStore = _configurator.walletStore
360
    for(walletEntry <- walletEntries) {
361
      val allow = walletEntry.finalized || allowNonFinalized
362
      if(allow) {
363
        walletEntriesStore.storeWalletEntry(walletEntry)
364
      }
365
    }
366
  }
367

    
368
  private[this] def _calcNewCreditSum(walletEntries: List[WalletEntry]): Double = {
369
    val newCredits = for {
370
      walletEntry <- walletEntries if(walletEntry.finalized)
371
    } yield walletEntry.value.toDouble
372

    
373
    newCredits.sum
374
  }
375

    
376
  /**
377
   * Process the resource event as if nothing else matters. Just do it.
378
   */
379
  private[this] def justProcessTheResourceEvent(ev: ResourceEvent, logLabel: String): Unit = {
380
    val start = System.currentTimeMillis
381
    DEBUG("Processing [%s] %s".format(logLabel, ev))
382

    
383
    // Initially, the user state (regarding resources) is empty.
384
    // So we have to compensate for both a totally empty resource state
385
    // and the update with new values.
386

    
387
    // 1. Find the resource definition
388
    val policy = Policy.policy
389
    policy.findResource(ev.resource) match {
390
      case Some(resource) ⇒
391
        // 2. Get the instance id and value for the resource
392
        val instanceIdM = resource match {
393
          // 2.1 If it is complex, from the details map, get the field which holds the instanceId
394
          case DSLComplexResource(name, unit, costPolicy, descriminatorField) ⇒
395
            ev.details.get(descriminatorField) match {
396
              case Some(instanceId) ⇒
397
                Just(instanceId)
398
              case None ⇒
399
                // We should have some value under this key here....
400
                Failed(throw new AccountingException("")) //TODO: See what to do here
401
            }
402
          // 2.2 If it is simple, ...
403
          case DSLSimpleResource(name, unit, costPolicy) ⇒
404
            // ... by convention, the instanceId of a simple resource is just "1"
405
            // @see [[gr.grnet.aquarium.user.OwnedResourcesSnapshot]]
406
            Just("1")
407
        }
408

    
409
        // 3. Did we get a valid instanceId?
410
        instanceIdM match {
411
          // 3.1 Yes, time to get/update the current state
412
          case Just(instanceId) ⇒
413
            val oldOwnedResources = _userState.ownedResources
414

    
415
            // A. First state diff: the modified resource value
416
            val StateChangeMillis = TimeHelpers.nowMillis
417
            val (newOwnedResources, oldRCInstanceOpt, newRCInstance) = oldOwnedResources.
418
              addOrUpdateResourceSnapshot(resource.name, instanceId, ev.value, ev.occurredMillis)
419
            val previousRCUpdateTime = oldRCInstanceOpt.map(_.snapshotTime).getOrElse(newRCInstance.snapshotTime)
420

    
421
            // Calculate the wallet entries generated from this resource event
422
            _userState.maybeDSLAgreement match {
423
              case Just(agreement) ⇒
424
                val walletEntriesM = chargeEvent(ev, agreement, ev.value,
425
                  new Date(previousRCUpdateTime),
426
                  findRelatedEntries(resource, ev.getInstanceId(policy)))
427
                walletEntriesM match {
428
                  case Just(walletEntries) ⇒
429
                    _storeWalletEntries(walletEntries, true)
430

    
431
                    // B. Second state diff: the new credits
432
                    val newCreditsSum = _calcNewCreditSum(walletEntries)
433
                    val oldCredits    = _userState.safeCredits.data
434
                    val newCredits = CreditSnapshot(oldCredits + newCreditsSum, StateChangeMillis)
435

    
436
                    // Finally, the userState change
437
                    DEBUG("Credits   = %s".format(this._userId, newCredits))
438
                    DEBUG("Resources = %s".format(this._userId, newOwnedResources))
439
                    this._userState = this._userState.copy(
440
                      credits = newCredits,
441
                      ownedResources = newOwnedResources
442
                    )
443
                  case NoVal ⇒
444
                    DEBUG("No wallet entries generated for %s".format(ev))
445
                  case failed @ Failed(e, m) ⇒
446
                    failed
447
                }
448
                
449
              case NoVal ⇒
450
                Failed(new UserDataSnapshotException("No agreement snapshot found for user %s".format(this._userId)))
451
              case failed @ Failed(e, m) ⇒
452
                failed
453
            }
454
          // 3.2 No, no luck, this is an error
455
          case NoVal ⇒
456
            Failed(new UserDataSnapshotException("No instanceId for resource %s of user %s".format(resource, this._userId)))
457
          case failed @ Failed(e, m) ⇒
458
            failed
459
        }
460
      // No resource definition found, this is an error
461
      case None ⇒ // Policy.policy.findResource(ev.resource)
462
        Failed(new UserDataSnapshotException("No resource %s found for user %s".format(ev.resource, this._userId)))
463
    }
464

    
465
    DEBUG("Finished %s time: %d ms".format(ev.id, System.currentTimeMillis - start))
466
  }
467

    
468
  private[this] def processCreateUser(event: UserEvent): Unit = {
469
    val userId = event.userId
470
    DEBUG("Creating user from state %s", event)
471
    val usersDB = _configurator.storeProvider.userStateStore
472
    usersDB.findUserStateByUserId(userId) match {
473
      case Just(userState) ⇒
474
        WARN("User already created, state = %s".format(userState))
475
      case failed @ Failed(e, m) ⇒
476
        ERROR("[%s][%s] %s", e.getClass.getName, e.getMessage, m)
477
      case NoVal ⇒
478
        // OK. Create a default UserState and store it
479
        val now = TimeHelpers.nowMillis
480
        val agreementOpt = Policy.policy.findAgreement("default")
481
        
482
        if(agreementOpt.isEmpty) {
483
          ERROR("No default agreement found. Cannot initialize user state")
484
        } else {
485
          this._userState = UserState(
486
            userId,
487
            ActiveSuspendedSnapshot(event.isStateActive, now),
488
            CreditSnapshot(0, now),
489
            AgreementSnapshot(agreementOpt.get.name, now),
490
            RolesSnapshot(event.roles, now),
491
            PaymentOrdersSnapshot(Nil, now),
492
            OwnedGroupsSnapshot(Nil, now),
493
            GroupMembershipsSnapshot(Nil, now),
494
            OwnedResourcesSnapshot(List(), now)
495
          )
496

    
497
          saveUserState
498
          DEBUG("Created and stored %s", this._userState)
499
        }
500
    }
501
  }
502

    
503
  private[this] def findRelatedEntries(res: DSLResource, instid: String): List[WalletEntry] = {
504
    val walletDB = _configurator.storeProvider.walletEntryStore
505
    walletDB.findPreviousEntry(_userId, res.name, instid, Some(false))
506
  }
507

    
508

    
509
  private[this] def processModifyUser(event: UserEvent): Unit = {
510
    val now = TimeHelpers.nowMillis
511
    val newActive = ActiveSuspendedSnapshot(event.isStateActive, now)
512

    
513
    DEBUG("New active status = %s".format(newActive))
514

    
515
    this._userState = this._userState.copy( active = newActive )
516
  }
517
  /**
518
   * Use the provided [[gr.grnet.aquarium.logic.events.UserEvent]] to change any user state.
519
   */
520
  private[this] def processUserEvent(event: UserEvent): Unit = {
521
    if(event.isCreateUser) {
522
      processCreateUser(event)
523
    } else if(event.isModifyUser) {
524
      processModifyUser(event)
525
    }
526
  }
527

    
528
  /**
529
   * Try to load from the DB the latest known info (snapshot data) for the given user.
530
   */
531
  private[this] def findUserState(userId: String): Maybe[UserState] = {
532
    val usersDB = _configurator.storeProvider.userStateStore
533
    usersDB.findUserStateByUserId(userId)
534
  }
535

    
536
  /**
537
   * Tries to makes sure that the internal user state exists.
538
   *
539
   * May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that.
540
   *
541
   */
542
  private[this] def ensureUserState(): Unit = {
543
    if(null eq this._userState) {
544
      findUserState(this._userId) match {
545
        case Just(userState) ⇒
546
          DEBUG("Loaded user state %s from DB", userState)
547
          //TODO: May be out of sync with the event store, rebuild it here
548
          this._userState = userState
549
          rebuildState(this._userState.oldestSnapshotTime)
550
        case Failed(e, m) ⇒
551
          ERROR("While loading user state from DB: [%s][%s] %s", e.getClass.getName, e.getMessage, m)
552
        case NoVal ⇒
553
          //TODO: Rebuild actor state here.
554
          rebuildState(0)
555
          WARN("Request for unknown (to Aquarium) user")
556
      }
557
    }
558
  }
559

    
560
  /**
561
   * Replay the event log for all events that affect the user state, starting
562
   * from the provided time instant.
563
   */
564
  def rebuildState(from: Long): Unit =
565
    rebuildState(from, oneYearAhead(new Date(), new Date(Long.MaxValue)).getTime)
566

    
567
  /**
568
   * Replay the event log for all events that affect the user state.
569
   */
570
  def rebuildState(from: Long, to: Long): Unit = {
571
    val start = System.currentTimeMillis()
572
    if (_userState == null)
573
      createBlankState
574

    
575
    //Rebuild state from user events
576
    val usersDB = _configurator.storeProvider.userEventStore
577
    val userEvents = usersDB.findUserEventsByUserId(_userId)
578
    val numUserEvents = userEvents.size
579
    _userState = replayUserEvents(_userState, userEvents, from, to)
580
      
581
    //Rebuild state from resource events
582
    val eventsDB = _configurator.storeProvider.resourceEventStore
583
    val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from)
584
    val numResourceEvents = resourceEvents.size
585
    _userState = replayResourceEvents(_userState, resourceEvents, from, to)
586

    
587
    //Rebuild state from wallet entries
588
    val wallet = _configurator.storeProvider.walletEntryStore
589
    val walletEnties = wallet.findUserWalletEntriesFromTo(_userId, new Date(from), new Date(to))
590
    val numWalletEntries = walletEnties.size
591
    _userState = replayResourceEvents(_userState, resourceEvents, from, to)
592

    
593
    INFO(("Rebuilt state from %d events (%d user events, " +
594
      "%d resource events, %d wallet entries) in %d msec").format(
595
      numUserEvents + numResourceEvents + numWalletEntries,
596
      numUserEvents, numResourceEvents, numWalletEntries,
597
      System.currentTimeMillis() - start))
598
  }
599

    
600
  /**
601
   * Create an empty state for a user
602
   */
603
  def createBlankState = {
604
    val now = 0
605
    val agreement = Policy.policy.findAgreement("default")
606

    
607
    this._userState = UserState(
608
      _userId,
609
      ActiveSuspendedSnapshot(false, now),
610
      CreditSnapshot(0, now),
611
      AgreementSnapshot(agreement.get.name, now),
612
      RolesSnapshot(List(), now),
613
      PaymentOrdersSnapshot(Nil, now),
614
      OwnedGroupsSnapshot(Nil, now),
615
      GroupMembershipsSnapshot(Nil, now),
616
      OwnedResourcesSnapshot(List(), now)
617
    )
618
  }
619

    
620
  /**
621
   * Replay user events on the provided user state
622
   */
623
  def replayUserEvents(initState: UserState, events: List[UserEvent],
624
                       from: Long, to: Long): UserState = {
625
    var act = initState.active
626
    var rol = initState.roles
627
    events
628
      .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
629
      .foreach {
630
        e =>
631
          act = act.copy(
632
            data = e.isStateActive, snapshotTime = e.occurredMillis)
633
          // TODO: Implement the following
634
          //_userState.agreement = _userState.agreement.copy(
635
          //  data = e.newAgreement, e.occurredMillis)
636

    
637
          rol = rol.copy(data = e.roles,
638
            snapshotTime = e.occurredMillis)
639
    }
640
    initState.copy(active = act, roles = rol)
641
  }
642

    
643
  /**
644
   * Replay resource events on the provided user state
645
   */
646
  def replayResourceEvents(initState: UserState, events: List[ResourceEvent],
647
                           from: Long, to: Long): UserState = {
648
    var res = initState.ownedResources
649
    events
650
      .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
651
      .foreach {
652
        e =>
653
          val name = Policy.policy.findResource(e.resource) match {
654
            case Some(x) => x.name
655
            case None => ""
656
          }
657

    
658
          val instanceId = e.getInstanceId(Policy.policy)
659
          res = res.addOrUpdateResourceSnapshot(name,
660
            instanceId, e.value, e.occurredMillis)._1
661
    }
662
    initState.copy(ownedResources = res)
663
  }
664

    
665
  /**
666
   * Replay wallet entries on the provided user state
667
   */
668
  def replayWalletEntries(initState: UserState, events: List[WalletEntry],
669
                          from: Long, to: Long): UserState = {
670
    var cred = initState.credits
671
    events
672
      .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
673
      .foreach {
674
        w => cred = cred.copy(data = w.value, snapshotTime = w.occurredMillis)
675
    }
676
    initState.copy(credits = cred)
677
  }
678

    
679
  /**
680
   * Persist current user state
681
   */
682
  private[this] def saveUserState(): Unit = {
683
    _configurator.storeProvider.userStateStore.deleteUserState(this._userId)
684
    _configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
685
      case Just(record) => record
686
      case NoVal => ERROR("Unknown error saving state")
687
      case Failed(e, a) =>
688
        ERROR("Saving state failed: %s error was: %s".format(a,e));
689
    }
690
  }
691

    
692
  protected def receive: Receive = {
693
    case m @ AquariumPropertiesLoaded(props) ⇒
694
      this._timestampTheshold = props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
695
      INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
696

    
697
    case m @ UserActorInitWithUserId(userId) ⇒
698
      this._userId = userId
699
      DEBUG("Actor starting, loading state")
700
      ensureUserState()
701

    
702
    case m @ ProcessResourceEvent(resourceEvent) ⇒
703
      if(resourceEvent.userId != this._userId) {
704
        ERROR("Received %s but my userId = %s".format(m, this._userId))
705
      } else {
706
        ensureUserState()
707
        processResourceEvent(resourceEvent, true)
708
      }
709

    
710
    case m @ ProcessUserEvent(userEvent) ⇒
711
      if(userEvent.userId != this._userId) {
712
        ERROR("Received %s but my userId = %s".format(m, this._userId))
713
      } else {
714
        ensureUserState()
715
        processUserEvent(userEvent)
716
      }
717

    
718
    case m @ RequestUserBalance(userId, timestamp) ⇒
719
      if(this._userId != userId) {
720
        ERROR("Received %s but my userId = %s".format(m, this._userId))
721
        // TODO: throw an exception here
722
      } else {
723
        // This is the big party.
724
        // Get the user state, if it exists and make sure it is not stale.
725
        ensureUserState()
726

    
727
        // Do we have a user state?
728
        if(_userState ne null) {
729
          // Yep, we do. See what there is inside it.
730
          val credits = _userState.credits
731
          val creditsTimestamp = credits.snapshotTime
732

    
733
          // Check if data is stale
734
          if(creditsTimestamp + _timestampTheshold > timestamp) {
735
            // No, it's OK
736
            self reply UserResponseGetBalance(userId, credits.data)
737
          } else {
738
            // Yep, data is stale and must recompute balance
739
            // FIXME: implement
740
            ERROR("FIXME: Should have computed a new value for %s".format(credits))
741
            self reply UserResponseGetBalance(userId, credits.data)
742
          }
743
        } else {
744
          // No user state exists. This is an error.
745
          val errMsg = "Could not load user state for %s".format(m)
746
          ERROR(errMsg)
747
          self reply ResponseUserBalance(userId, 0, Some(errMsg))
748
        }
749
      }
750

    
751
    case m @ UserRequestGetState(userId, timestamp) ⇒
752
      if(this._userId != userId) {
753
        ERROR("Received %s but my userId = %s".format(m, this._userId))
754
        // TODO: throw an exception here
755
      } else {
756
        // FIXME: implement
757
        ERROR("FIXME: Should have properly computed the user state")
758
        ensureUserState()
759
        self reply UserResponseGetState(userId, this._userState)
760
      }
761
  }
762

    
763
  override def postStop {
764
    DEBUG("Stopping, saving state")
765
    saveUserState
766
  }
767

    
768
  private[this] def DEBUG(fmt: String, args: Any*) =
769
    logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
770

    
771
  private[this] def INFO(fmt: String, args: Any*) =
772
      logger.info("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
773

    
774
  private[this] def WARN(fmt: String, args: Any*) =
775
    logger.warn("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
776

    
777
  private[this] def ERROR(fmt: String, args: Any*) =
778
    logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
779
}