Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / user / actor / UserActor.scala @ 16319b44

History | View | Annotate | Download (25.1 kB)

1
/*
2
 * Copyright 2011 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium.user.actor
37

    
38
import gr.grnet.aquarium.actor._
39
import gr.grnet.aquarium.Configurator
40
import gr.grnet.aquarium.processor.actor._
41
import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
42
import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting}
43
import gr.grnet.aquarium.user._
44
import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent}
45
import gr.grnet.aquarium.logic.accounting.dsl.{DSLPolicy, DSLResource, DSLSimpleResource, DSLComplexResource}
46
import java.util.Date
47
import gr.grnet.aquarium.util.{DateUtils, TimeHelpers, Loggable}
48

    
49

    
50
/**
51
 *
52
 * @author Christos KK Loverdos <loverdos@gmail.com>
53
 */
54

    
55
class UserActor extends AquariumActor
56
  with Loggable with Accounting with DateUtils {
57
  @volatile
58
  private[this] var _userId: String = _
59
  @volatile
60
  private[this] var _userState: UserState = _
61
  @volatile
62
  private[this] var _timestampTheshold: Long = _
63

    
64
  def role = UserActorRole
65

    
66
  private[this] def _configurator: Configurator = Configurator.MasterConfigurator
67

    
68
  private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = {
69
    if(checkForOlderEvents) {
70
      DEBUG("Checking for events older than %s" format resourceEvent)
71
      processOlderResourceEvents(resourceEvent)
72
    }
73

    
74
    justProcessTheResourceEvent(resourceEvent, "ACTUAL")
75
  }
76

    
77

    
78
  /**
79
   * Given an "onoff" event, we try to locate all unprocessed resource events that precede this one.
80
   */
81
  def findOlderResourceEventsForOnOff(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
82
    Nil
83
  }
84

    
85
  def findOlderResourceEventsForOther(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
86
    Nil
87
  }
88

    
89
  /**
90
   * Find resource events that precede the given one and are unprocessed.
91
   */
92
  private[this] def findOlderResourceEvents(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
93
    if(rcEvent.isOnOffEvent(policy)) {
94
      findOlderResourceEventsForOnOff(rcEvent, policy)
95
    } else {
96
      findOlderResourceEventsForOther(rcEvent, policy)
97
    }
98
  }
99

    
100
  /**
101
   * Find and process older resource events.
102
   *
103
   * Older resource events are found based on the latest credit calculation, that is the latest
104
   * wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries
105
   * have been calculated for these resource events and start from there.
106
   */
107
  private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = {
108
    assert(_userId == resourceEvent.userId)
109
    val rceId = resourceEvent.id
110
    val userId = resourceEvent.userId
111
    val resourceEventStore = _configurator.resourceEventStore
112
    val walletEntriesStore = _configurator.walletStore
113

    
114
    // 1. Find latest wallet entry
115
    val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId)
116
    latestWalletEntriesM match {
117
      case Just(latestWalletEntries) ⇒
118
        // The time on which we base the selection of the older events
119
        val selectionTime = latestWalletEntries.head.occurredMillis
120

    
121
        // 2. Now find all resource events past the time of the latest wallet entry.
122
        //    These events have not been processed, except probably those ones
123
        //    that have the same `occurredMillis` with `selectionTime`
124
        val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime)
125

    
126
        // 3. Filter out those resource events for which no wallet entry has actually been
127
        //    produced.
128
        val rcEventsToProcess = for {
129
          oldRCEvent        <- oldRCEvents
130
          oldRCEventId      = oldRCEvent.id
131
          latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId)
132
        } yield {
133
          oldRCEvent
134
        }
135

    
136
        DEBUG("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent))
137

    
138
        for {
139
          rcEventToProcess <- rcEventsToProcess
140
        } {
141
          justProcessTheResourceEvent(rcEventToProcess, "OLDER")
142
        }
143
      case NoVal ⇒
144
        DEBUG("No events to process older than %s".format(resourceEvent))
145
      case Failed(e, m) ⇒
146
        ERROR("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage))
147
    }
148
  }
149

    
150
  private[this] def _storeWalletEntries(walletEntries: List[WalletEntry], allowNonFinalized: Boolean): Unit = {
151
    val walletEntriesStore = _configurator.walletStore
152
    for(walletEntry <- walletEntries) {
153
      val allow = walletEntry.finalized || allowNonFinalized
154
      if(allow) {
155
        walletEntriesStore.storeWalletEntry(walletEntry)
156
      }
157
    }
158
  }
159

    
160
  private[this] def _calcNewCreditSum(walletEntries: List[WalletEntry]): Double = {
161
    val newCredits = for {
162
      walletEntry <- walletEntries if(walletEntry.finalized)
163
    } yield walletEntry.value.toDouble
164

    
165
    newCredits.sum
166
  }
167

    
168
  /**
169
   * Process the resource event as if nothing else matters. Just do it.
170
   */
171
  private[this] def justProcessTheResourceEvent(ev: ResourceEvent, logLabel: String): Unit = {
172
    val start = System.currentTimeMillis
173
    DEBUG("Processing [%s] %s".format(logLabel, ev))
174

    
175
    // Initially, the user state (regarding resources) is empty.
176
    // So we have to compensate for both a totally empty resource state
177
    // and the update with new values.
178

    
179
    // 1. Find the resource definition
180
    val policy = Policy.policy
181
    policy.findResource(ev.resource) match {
182
      case Some(resource) ⇒
183
        // 2. Get the instance id and value for the resource
184
        val instanceIdM = resource match {
185
          // 2.1 If it is complex, from the details map, get the field which holds the instanceId
186
          case DSLComplexResource(name, unit, costPolicy, descriminatorField) ⇒
187
            ev.details.get(descriminatorField) match {
188
              case Some(instanceId) ⇒
189
                Just(instanceId)
190
              case None ⇒
191
                // We should have some value under this key here....
192
                Failed(throw new AccountingException("")) //TODO: See what to do here
193
            }
194
          // 2.2 If it is simple, ...
195
          case DSLSimpleResource(name, unit, costPolicy) ⇒
196
            // ... by convention, the instanceId of a simple resource is just "1"
197
            // @see [[gr.grnet.aquarium.user.OwnedResourcesSnapshot]]
198
            Just("1")
199
        }
200

    
201
        // 3. Did we get a valid instanceId?
202
        instanceIdM match {
203
          // 3.1 Yes, time to get/update the current state
204
          case Just(instanceId) ⇒
205
            val oldOwnedResources = _userState.ownedResources
206

    
207
            // A. First state diff: the modified resource value
208
            val StateChangeMillis = TimeHelpers.nowMillis
209
            val (newOwnedResources, oldRCInstanceOpt, newRCInstance) = oldOwnedResources.
210
              addOrUpdateResourceSnapshot(resource.name, instanceId, ev.value, ev.occurredMillis)
211
            val previousRCUpdateTime = oldRCInstanceOpt.map(_.snapshotTime).getOrElse(newRCInstance.snapshotTime)
212

    
213
            // Calculate the wallet entries generated from this resource event
214
            _userState.maybeDSLAgreement match {
215
              case Just(agreement) ⇒
216
                val walletEntriesM = chargeEvent(ev, agreement, ev.value,
217
                  new Date(previousRCUpdateTime),
218
                  findRelatedEntries(resource, ev.getInstanceId(policy)))
219
                walletEntriesM match {
220
                  case Just(walletEntries) ⇒
221
                    _storeWalletEntries(walletEntries, true)
222

    
223
                    // B. Second state diff: the new credits
224
                    val newCreditsSum = _calcNewCreditSum(walletEntries)
225
                    val oldCredits    = _userState.safeCredits.data
226
                    val newCredits = CreditSnapshot(oldCredits + newCreditsSum, StateChangeMillis)
227

    
228
                    // Finally, the userState change
229
                    DEBUG("Credits   = %s".format(this._userId, newCredits))
230
                    DEBUG("Resources = %s".format(this._userId, newOwnedResources))
231
                    this._userState = this._userState.copy(
232
                      credits = newCredits,
233
                      ownedResources = newOwnedResources
234
                    )
235
                  case NoVal ⇒
236
                    DEBUG("No wallet entries generated for %s".format(ev))
237
                  case failed @ Failed(e, m) ⇒
238
                    failed
239
                }
240
                
241
              case NoVal ⇒
242
                Failed(new UserDataSnapshotException("No agreement snapshot found for user %s".format(this._userId)))
243
              case failed @ Failed(e, m) ⇒
244
                failed
245
            }
246
          // 3.2 No, no luck, this is an error
247
          case NoVal ⇒
248
            Failed(new UserDataSnapshotException("No instanceId for resource %s of user %s".format(resource, this._userId)))
249
          case failed @ Failed(e, m) ⇒
250
            failed
251
        }
252
      // No resource definition found, this is an error
253
      case None ⇒ // Policy.policy.findResource(ev.resource)
254
        Failed(new UserDataSnapshotException("No resource %s found for user %s".format(ev.resource, this._userId)))
255
    }
256

    
257
    DEBUG("Finished %s time: %d ms".format(ev.id, System.currentTimeMillis - start))
258
  }
259

    
260
  private[this] def processCreateUser(event: UserEvent): Unit = {
261
    val userId = event.userId
262
    DEBUG("Creating user from state %s", event)
263
    val usersDB = _configurator.storeProvider.userStateStore
264
    usersDB.findUserStateByUserId(userId) match {
265
      case Just(userState) ⇒
266
        WARN("User already created, state = %s".format(userState))
267
      case failed @ Failed(e, m) ⇒
268
        ERROR("[%s][%s] %s", e.getClass.getName, e.getMessage, m)
269
      case NoVal ⇒
270
        // OK. Create a default UserState and store it
271
        val now = TimeHelpers.nowMillis
272
        val agreementOpt = Policy.policy.findAgreement("default")
273
        
274
        if(agreementOpt.isEmpty) {
275
          ERROR("No default agreement found. Cannot initialize user state")
276
        } else {
277
          this._userState = UserState(
278
            userId,
279
            0,
280
            ActiveSuspendedSnapshot(event.isStateActive, now),
281
            CreditSnapshot(0, now),
282
            AgreementSnapshot(agreementOpt.get.name, now),
283
            RolesSnapshot(event.roles, now),
284
            PaymentOrdersSnapshot(Nil, now),
285
            OwnedGroupsSnapshot(Nil, now),
286
            GroupMembershipsSnapshot(Nil, now),
287
            OwnedResourcesSnapshot(List(), now)
288
          )
289

    
290
          saveUserState
291
          DEBUG("Created and stored %s", this._userState)
292
        }
293
    }
294
  }
295

    
296
  private[this] def findRelatedEntries(res: DSLResource, instid: String): List[WalletEntry] = {
297
    val walletDB = _configurator.storeProvider.walletEntryStore
298
    walletDB.findPreviousEntry(_userId, res.name, instid, Some(false))
299
  }
300

    
301

    
302
  private[this] def processModifyUser(event: UserEvent): Unit = {
303
    val now = TimeHelpers.nowMillis
304
    val newActive = ActiveSuspendedSnapshot(event.isStateActive, now)
305

    
306
    DEBUG("New active status = %s".format(newActive))
307

    
308
    this._userState = this._userState.copy( active = newActive )
309
  }
310
  /**
311
   * Use the provided [[gr.grnet.aquarium.logic.events.UserEvent]] to change any user state.
312
   */
313
  private[this] def processUserEvent(event: UserEvent): Unit = {
314
    if(event.isCreateUser) {
315
      processCreateUser(event)
316
    } else if(event.isModifyUser) {
317
      processModifyUser(event)
318
    }
319
  }
320

    
321
  /**
322
   * Try to load from the DB the latest known info (snapshot data) for the given user.
323
   */
324
  private[this] def findUserState(userId: String): Maybe[UserState] = {
325
    val usersDB = _configurator.storeProvider.userStateStore
326
    usersDB.findUserStateByUserId(userId)
327
  }
328

    
329
  /**
330
   * Tries to makes sure that the internal user state exists.
331
   *
332
   * May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that.
333
   *
334
   */
335
  private[this] def ensureUserState(): Unit = {
336
    /*if(null eq this._userState) {
337
      findUserState(this._userId) match {
338
        case Just(userState) ⇒
339
          DEBUG("Loaded user state %s from DB", userState)
340
          //TODO: May be out of sync with the event store, rebuild it here
341
          this._userState = userState
342
          rebuildState(this._userState.oldestSnapshotTime)
343
        case Failed(e, m) ⇒
344
          ERROR("While loading user state from DB: [%s][%s] %s", e.getClass.getName, e.getMessage, m)
345
        case NoVal ⇒
346
          //TODO: Rebuild actor state here.
347
          rebuildState(0)
348
          WARN("Request for unknown (to Aquarium) user")
349
      }
350
    }*/
351

    
352
    if (_userState == null)
353
      rebuildState(0)
354
    else
355
      rebuildState(_userState.oldestSnapshotTime, System.currentTimeMillis())
356
  }
357

    
358
  /**
359
   * Replay the event log for all events that affect the user state, starting
360
   * from the provided time instant.
361
   */
362
  def rebuildState(from: Long): Unit =
363
    rebuildState(from, oneYearAhead(new Date(), new Date(Long.MaxValue)).getTime)
364

    
365
  /**
366
   * Replay the event log for all events that affect the user state.
367
   */
368
  def rebuildState(from: Long, to: Long): Unit = {
369
    val start = System.currentTimeMillis()
370
    if (_userState == null)
371
      createBlankState
372

    
373
    //Rebuild state from user events
374
    val usersDB = _configurator.storeProvider.userEventStore
375
    val userEvents = usersDB.findUserEventsByUserId(_userId)
376
    val numUserEvents = userEvents.size
377
    _userState = replayUserEvents(_userState, userEvents, from, to)
378
      
379
    //Rebuild state from resource events
380
    val eventsDB = _configurator.storeProvider.resourceEventStore
381
    val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from)
382
    val numResourceEvents = resourceEvents.size
383
    _userState = replayResourceEvents(_userState, resourceEvents, from, to)
384

    
385
    //Rebuild state from wallet entries
386
    val wallet = _configurator.storeProvider.walletEntryStore
387
    val walletEnties = wallet.findWalletEntriesAfter(_userId, new Date(from))
388
    val numWalletEntries = walletEnties.size
389
    _userState = replayWalletEntries(_userState, walletEnties, from, to)
390

    
391
    INFO(("Rebuilt state from %d events (%d user events, " +
392
      "%d resource events, %d wallet entries) in %d msec").format(
393
      numUserEvents + numResourceEvents + numWalletEntries,
394
      numUserEvents, numResourceEvents, numWalletEntries,
395
      System.currentTimeMillis() - start))
396
  }
397

    
398
  /**
399
   * Create an empty state for a user
400
   */
401
  def createBlankState = {
402
    val now = System.currentTimeMillis()
403
    val agreement = Policy.policy.findAgreement("default")
404

    
405
    this._userState = UserState(
406
      _userId,
407
      0,
408
      ActiveSuspendedSnapshot(false, now),
409
      CreditSnapshot(0, now),
410
      AgreementSnapshot(agreement.get.name, now),
411
      RolesSnapshot(List(), now),
412
      PaymentOrdersSnapshot(Nil, now),
413
      OwnedGroupsSnapshot(Nil, now),
414
      GroupMembershipsSnapshot(Nil, now),
415
      OwnedResourcesSnapshot(List(), now)
416
    )
417
  }
418

    
419
  /**
420
   * Replay user events on the provided user state
421
   */
422
  def replayUserEvents(initState: UserState, events: List[UserEvent],
423
                       from: Long, to: Long): UserState = {
424
    var act = initState.active
425
    var rol = initState.roles
426
    events
427
      .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
428
      .foreach {
429
        e =>
430
          act = act.copy(
431
            data = e.isStateActive, snapshotTime = e.occurredMillis)
432
          // TODO: Implement the following
433
          //_userState.agreement = _userState.agreement.copy(
434
          //  data = e.newAgreement, e.occurredMillis)
435

    
436
          rol = rol.copy(data = e.roles,
437
            snapshotTime = e.occurredMillis)
438
    }
439
    initState.copy(active = act, roles = rol)
440
  }
441

    
442
  /**
443
   * Replay resource events on the provided user state
444
   */
445
  def replayResourceEvents(initState: UserState, events: List[ResourceEvent],
446
                           from: Long, to: Long): UserState = {
447
    var res = initState.ownedResources
448
    events
449
      .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
450
      .foreach {
451
        e =>
452
          val name = Policy.policy.findResource(e.resource) match {
453
            case Some(x) => x.name
454
            case None => ""
455
          }
456

    
457
          val instanceId = e.getInstanceId(Policy.policy)
458
          res = res.addOrUpdateResourceSnapshot(name,
459
            instanceId, e.value, e.occurredMillis)._1
460
    }
461
    if (!events.isEmpty) {
462
      val snapTime = events.map{e => e.occurredMillis}.max
463
      res = res.copy(snapshotTime = snapTime)
464
    }
465
    initState.copy(ownedResources = res)
466
  }
467

    
468
  /**
469
   * Replay wallet entries on the provided user state
470
   */
471
  def replayWalletEntries(initState: UserState, events: List[WalletEntry],
472
                          from: Long, to: Long): UserState = {
473
    var cred = initState.credits
474
    events
475
      .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
476
      .foreach {
477
        w =>
478
          val newVal = cred.data + w.value
479
          cred = cred.copy(data = newVal)
480
    }
481
    if (!events.isEmpty) {
482
      val snapTime = events.map{e => e.occurredMillis}.max
483
      cred = cred.copy(snapshotTime = snapTime)
484
    }
485
    initState.copy(credits = cred)
486
  }
487

    
488
  /**
489
   * Update wallet entries for all unprocessed events
490
   */
491
  def calcWalletEntries(): Unit = {
492
    ensureUserState
493

    
494
    if (_userState.ownedResources.snapshotTime < _userState.credits.snapshotTime) return
495
    val eventsDB = _configurator.storeProvider.resourceEventStore
496
    val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, _userState.credits.snapshotTime)
497
    val policy = Policy.policy
498

    
499
    val walletEntries = resourceEvents.map {
500
      ev =>
501
        // TODO: Check that agreement exists
502
        val agr = policy.findAgreement(_userState.agreement.data).get
503

    
504
        val resource = policy.findResource(ev.resource) match {
505
          case Some(x) => x
506
          case None =>
507
            val errMsg = "Cannot find resource: %s".format(ev.resource)
508
            ERROR(errMsg)
509
            throw new AccountingException(errMsg) // FIXME: to throw or not to throw?
510
        }
511

    
512
        val instid = resource.isComplex match {
513
          case true => ev.details.get(resource.asInstanceOf[DSLComplexResource].descriminatorField)
514
          case false => None
515
        }
516

    
517
        var curValue = 0F
518
        var lastUpdate = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)) match {
519
          case Some(x) => x.snapshotTime
520
          case None => Long.MaxValue //To trigger recalculation
521
        }
522

    
523
        if (lastUpdate > ev.occurredMillis) {
524
          //Event is older that current state. Rebuild state up to event timestamp
525
          val resHistory =
526
            ResourceEvent("", 0, 0, _userId, "1", ev.resource, ev.eventVersion, 0, ev.details) ::
527
            eventsDB.findResourceEventHistory(_userId, ev.resource, instid, ev.occurredMillis)
528
          INFO("%d older entries for resource %s, user %s up to %d".format(resHistory.size, ev.resource, _userId, ev.occurredMillis));
529
          var res = OwnedResourcesSnapshot(List(), 0)
530
          resHistory.foreach {
531
            e =>
532
              res = res.addOrUpdateResourceSnapshot(e.resource, e.getInstanceId(policy), e.value, e.occurredMillis)._1
533
          }
534
          lastUpdate = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.snapshotTime
535
          curValue = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data
536
        } else {
537
          curValue = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data
538
        }
539

    
540
        val entries = chargeEvent(ev, agr, curValue, new Date(lastUpdate),
541
          findRelatedEntries(resource, ev.getInstanceId(policy)))
542
        INFO("PERF: CHARGE %s %d".format(ev.id, System.currentTimeMillis))
543
        entries match {
544
          case Just(x) => x
545
          case Failed(e, r) => List()
546
          case NoVal => List()
547
        }
548
    }.flatten
549

    
550
    val walletDB = _configurator.storeProvider.walletEntryStore
551
    walletEntries.foreach(w => walletDB.storeWalletEntry(w))
552

    
553
    ensureUserState
554
  }
555

    
556
  /**
557
   * Persist current user state
558
   */
559
  private[this] def saveUserState(): Unit = {
560
    _configurator.storeProvider.userStateStore.deleteUserState(this._userId)
561
    _configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
562
      case Just(record) => record
563
      case NoVal => ERROR("Unknown error saving state")
564
      case Failed(e, a) =>
565
        ERROR("Saving state failed: %s error was: %s".format(a,e));
566
    }
567
  }
568

    
569
  protected def receive: Receive = {
570
    case m @ AquariumPropertiesLoaded(props) ⇒
571
      this._timestampTheshold = props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
572
      INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
573

    
574
    case m @ UserActorInitWithUserId(userId) ⇒
575
      this._userId = userId
576
      DEBUG("Actor starting, loading state")
577
      ensureUserState()
578

    
579
    case m @ ProcessResourceEvent(resourceEvent) ⇒
580
      if(resourceEvent.userId != this._userId) {
581
        ERROR("Received %s but my userId = %s".format(m, this._userId))
582
      } else {
583
        //ensureUserState()
584
        calcWalletEntries()
585
        //processResourceEvent(resourceEvent, true)
586
      }
587

    
588
    case m @ ProcessUserEvent(userEvent) ⇒
589
      if(userEvent.userId != this._userId) {
590
        ERROR("Received %s but my userId = %s".format(m, this._userId))
591
      } else {
592
        ensureUserState()
593
        processUserEvent(userEvent)
594
      }
595

    
596
    case m @ RequestUserBalance(userId, timestamp) ⇒
597
      /*if(this._userId != userId) {
598
        ERROR("Received %s but my userId = %s".format(m, this._userId))
599
        // TODO: throw an exception here
600
      } else {
601
        // This is the big party.
602
        // Get the user state, if it exists and make sure it is not stale.
603
        ensureUserState()
604

    
605
        // Do we have a user state?
606
        if(_userState ne null) {
607
          // Yep, we do. See what there is inside it.
608
          val credits = _userState.credits
609
          val creditsTimestamp = credits.snapshotTime
610

    
611
          // Check if data is stale
612
          if(creditsTimestamp + _timestampTheshold > timestamp) {
613
            // No, it's OK
614
            self reply UserResponseGetBalance(userId, credits.data)
615
          } else {
616
            // Yep, data is stale and must recompute balance
617
            // FIXME: implement
618
            ERROR("FIXME: Should have computed a new value for %s".format(credits))
619
            self reply UserResponseGetBalance(userId, credits.data)
620
          }
621
        } else {
622
          // No user state exists. This is an error.
623
          val errMsg = "Could not load user state for %s".format(m)
624
          ERROR(errMsg)
625
          self reply ResponseUserBalance(userId, 0, Some(errMsg))
626
        }*/
627
        if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
628
          calcWalletEntries()
629
        self reply UserResponseGetBalance(userId, _userState.credits.data)
630
      //}
631

    
632
    case m @ UserRequestGetState(userId, timestamp) ⇒
633
      if(this._userId != userId) {
634
        ERROR("Received %s but my userId = %s".format(m, this._userId))
635
        // TODO: throw an exception here
636
      } else {
637
        // FIXME: implement
638
        ERROR("FIXME: Should have properly computed the user state")
639
        ensureUserState()
640
        self reply UserResponseGetState(userId, this._userState)
641
      }
642
  }
643

    
644
  override def postStop {
645
    DEBUG("Stopping, saving state")
646
    //saveUserState
647
  }
648

    
649
  private[this] def DEBUG(fmt: String, args: Any*) =
650
    logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
651

    
652
  private[this] def INFO(fmt: String, args: Any*) =
653
      logger.info("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
654

    
655
  private[this] def WARN(fmt: String, args: Any*) =
656
    logger.warn("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
657

    
658
  private[this] def ERROR(fmt: String, args: Any*) =
659
    logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
660
}