WARN("Request for unknown (to Aquarium) user")
}
}
+ /*
+ if (_userState == null)
+ rebuildState(0)
+ else
+ rebuildState(_userState.newestSnapshotTime, System.currentTimeMillis())
+ */
}
/**
val wallet = _configurator.storeProvider.walletEntryStore
val walletEnties = wallet.findUserWalletEntriesFromTo(_userId, new Date(from), new Date(to))
val numWalletEntries = walletEnties.size
- _userState = replayResourceEvents(_userState, resourceEvents, from, to)
+ _userState = replayWalletEntries(_userState, walletEnties, from, to)
INFO(("Rebuilt state from %d events (%d user events, " +
"%d resource events, %d wallet entries) in %d msec").format(
events
.filter(e => e.occurredMillis >= from && e.occurredMillis < to)
.foreach {
- w => cred = cred.copy(data = w.value, snapshotTime = w.occurredMillis)
+ w =>
+ val newVal = cred.data + w.value
+ cred = cred.copy(data = newVal, snapshotTime = w.occurredMillis)
}
initState.copy(credits = cred)
}
/**
+ * Update wallet entries for all unprocessed events
+ */
+ def calcWalletEntries(): Unit = {
+ ensureUserState
+
+ if (_userState.ownedResources.snapshotTime < _userState.credits.snapshotTime) return List()
+ println("%d %d".format(_userState.ownedResources.snapshotTime, _userState.credits.snapshotTime))
+ val eventsDB = _configurator.storeProvider.resourceEventStore
+ val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, _userState.credits.snapshotTime)
+ val policy = Policy.policy
+
+ val walletEntries = resourceEvents.map {
+ ev =>
+ // TODO: Check that agreement exists
+ val agr = policy.findAgreement(_userState.agreement.data).get
+
+ // Since we always process resource updates before producing wallet
+ // entries, it is safe to assume that a resource instance has already
+ // been recorded in the state and avoid further checks. If not, a
+ // bug lurks somewhere, so just let it crash.
+ val prevUpdate = _userState.ownedResources.findResourceSnapshot(
+ ev.resource, ev.getInstanceId(policy)).get.snapshotTime
+ val resource = policy.findResource(ev.resource).get
+
+ val instid = resource.isComplex match {
+ case true => ev.details.get(resource.asInstanceOf[DSLComplexResource].descriminatorField)
+ case false => None
+ }
+ val resHistory = eventsDB.findResourceEventHistory(_userId, ev.resource, instid, ev.occurredMillis)
+
+ var res = OwnedResourcesSnapshot(List(), 0)
+ resHistory.foreach {
+ e =>
+ res = res.addOrUpdateResourceSnapshot(ev.resource, ev.getInstanceId(policy), ev.value, ev.occurredMillis)._1
+ }
+
+ val curValue = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data
+ val lastUpdate = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.snapshotTime
+
+ val entries = chargeEvent(ev, agr, curValue, new Date(lastUpdate),
+ findRelatedEntries(resource, ev.getInstanceId(policy)))
+
+ entries match {
+ case Just(x) => x
+ case Failed(e, r) => List()
+ case NoVal => List()
+ }
+ }.flatten
+
+ val walletDB = _configurator.storeProvider.walletEntryStore
+ walletEntries.foreach(w => walletDB.storeWalletEntry(w))
+
+ ensureUserState
+ }
+
+ /**
* Persist current user state
*/
private[this] def saveUserState(): Unit = {
ERROR(errMsg)
self reply ResponseUserBalance(userId, 0, Some(errMsg))
}
+ //calcWalletEntries()
+ self reply UserResponseGetBalance(userId, _userState.credits.data)
}
case m @ UserRequestGetState(userId, timestamp) ⇒