while(cursor.hasNext) {
buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
}
- buffer.toList
+ buffer.toList.sortWith(_sortByTimestampAsc)
} finally {
cursor.close()
}
instid: Option[String], upTo: Long) : List[ResourceEvent] = {
val query = new BasicDBObject()
query.put(ResourceJsonNames.userId, userId)
- query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$lte", upTo))
+ query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$lt", upTo))
query.put(ResourceJsonNames.resource, resName)
instid match {
while(cursor.hasNext) {
buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
}
- buffer.toList
+ buffer.toList.sortWith(_sortByTimestampAsc)
} finally {
cursor.close()
}
MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
}
+ def findWalletEntriesAfter(userId: String, from: Date) : List[WalletEntry] = {
+ val q = new BasicDBObject()
+ q.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
+ q.put(ResourceJsonNames.userId, userId)
+
+ MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
+ }
+
def findLatestUserWalletEntries(userId: String) = {
Maybe {
val orderBy = new BasicDBObject(ResourceJsonNames.occurredMillis, -1) // -1 is descending order
*
*/
private[this] def ensureUserState(): Unit = {
- if(null eq this._userState) {
+ /*if(null eq this._userState) {
findUserState(this._userId) match {
case Just(userState) ⇒
DEBUG("Loaded user state %s from DB", userState)
rebuildState(0)
WARN("Request for unknown (to Aquarium) user")
}
- }
- /*
+ }*/
+
if (_userState == null)
rebuildState(0)
else
- rebuildState(_userState.newestSnapshotTime, System.currentTimeMillis())
- */
+ rebuildState(_userState.oldestSnapshotTime, System.currentTimeMillis())
}
/**
//Rebuild state from wallet entries
val wallet = _configurator.storeProvider.walletEntryStore
- val walletEnties = wallet.findUserWalletEntriesFromTo(_userId, new Date(from), new Date(to))
+ val walletEnties = wallet.findWalletEntriesAfter(_userId, new Date(from))
val numWalletEntries = walletEnties.size
_userState = replayWalletEntries(_userState, walletEnties, from, to)
* Create an empty state for a user
*/
def createBlankState = {
- val now = 0
+ val now = System.currentTimeMillis()
val agreement = Policy.policy.findAgreement("default")
this._userState = UserState(
res = res.addOrUpdateResourceSnapshot(name,
instanceId, e.value, e.occurredMillis)._1
}
+ if (!events.isEmpty) {
+ val snapTime = events.map{e => e.occurredMillis}.max
+ res = res.copy(snapshotTime = snapTime)
+ }
initState.copy(ownedResources = res)
}
.foreach {
w =>
val newVal = cred.data + w.value
- cred = cred.copy(data = newVal, snapshotTime = w.occurredMillis)
+ cred = cred.copy(data = newVal)
+ }
+ if (!events.isEmpty) {
+ val snapTime = events.map{e => e.occurredMillis}.max
+ cred = cred.copy(snapshotTime = snapTime)
}
initState.copy(credits = cred)
}
def calcWalletEntries(): Unit = {
ensureUserState
- if (_userState.ownedResources.snapshotTime < _userState.credits.snapshotTime) return List()
- println("%d %d".format(_userState.ownedResources.snapshotTime, _userState.credits.snapshotTime))
+ if (_userState.ownedResources.snapshotTime < _userState.credits.snapshotTime) return
val eventsDB = _configurator.storeProvider.resourceEventStore
val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, _userState.credits.snapshotTime)
val policy = Policy.policy
// TODO: Check that agreement exists
val agr = policy.findAgreement(_userState.agreement.data).get
- // Since we always process resource updates before producing wallet
- // entries, it is safe to assume that a resource instance has already
- // been recorded in the state and avoid further checks. If not, a
- // bug lurks somewhere, so just let it crash.
- val prevUpdate = _userState.ownedResources.findResourceSnapshot(
- ev.resource, ev.getInstanceId(policy)).get.snapshotTime
- val resource = policy.findResource(ev.resource).get
+ val resource = policy.findResource(ev.resource) match {
+ case Some(x) => x
+ case None =>
+ val errMsg = "Cannot find resource: %s".format(ev.resource)
+ ERROR(errMsg)
+ throw new AccountingException(errMsg) // FIXME: to throw or not to throw?
+ }
val instid = resource.isComplex match {
case true => ev.details.get(resource.asInstanceOf[DSLComplexResource].descriminatorField)
case false => None
}
- val resHistory = eventsDB.findResourceEventHistory(_userId, ev.resource, instid, ev.occurredMillis)
- var res = OwnedResourcesSnapshot(List(), 0)
- resHistory.foreach {
- e =>
- res = res.addOrUpdateResourceSnapshot(ev.resource, ev.getInstanceId(policy), ev.value, ev.occurredMillis)._1
+ var curValue = 0F
+ var lastUpdate = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)) match {
+ case Some(x) => x.snapshotTime
+ case None => Long.MaxValue //To trigger recalculation
}
- val curValue = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data
- val lastUpdate = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.snapshotTime
+ if (lastUpdate > ev.occurredMillis) {
+ //Event is older that current state. Rebuild state up to event timestamp
+ val resHistory =
+ ResourceEvent("", 0, 0, _userId, "1", ev.resource, ev.eventVersion, 0, ev.details) ::
+ eventsDB.findResourceEventHistory(_userId, ev.resource, instid, ev.occurredMillis)
+ INFO("%d older entries for resource %s, user %s up to %d".format(resHistory.size, ev.resource, _userId, ev.occurredMillis));
+ var res = OwnedResourcesSnapshot(List(), 0)
+ resHistory.foreach {
+ e =>
+ res = res.addOrUpdateResourceSnapshot(e.resource, e.getInstanceId(policy), e.value, e.occurredMillis)._1
+ }
+ lastUpdate = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.snapshotTime
+ curValue = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data
+ } else {
+ curValue = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data
+ }
val entries = chargeEvent(ev, agr, curValue, new Date(lastUpdate),
findRelatedEntries(resource, ev.getInstanceId(policy)))
-
+ INFO("PERF: CHARGE %s %d".format(ev.id, System.currentTimeMillis))
entries match {
case Just(x) => x
case Failed(e, r) => List()
if(resourceEvent.userId != this._userId) {
ERROR("Received %s but my userId = %s".format(m, this._userId))
} else {
- ensureUserState()
- processResourceEvent(resourceEvent, true)
+ //ensureUserState()
+ calcWalletEntries()
+ //processResourceEvent(resourceEvent, true)
}
case m @ ProcessUserEvent(userEvent) ⇒
}
case m @ RequestUserBalance(userId, timestamp) ⇒
- if(this._userId != userId) {
+ /*if(this._userId != userId) {
ERROR("Received %s but my userId = %s".format(m, this._userId))
// TODO: throw an exception here
} else {
val errMsg = "Could not load user state for %s".format(m)
ERROR(errMsg)
self reply ResponseUserBalance(userId, 0, Some(errMsg))
- }
- //calcWalletEntries()
+ }*/
+ if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
+ calcWalletEntries()
self reply UserResponseGetBalance(userId, _userState.credits.data)
- }
+ //}
case m @ UserRequestGetState(userId, timestamp) ⇒
if(this._userId != userId) {
override def postStop {
DEBUG("Stopping, saving state")
- saveUserState
+ //saveUserState
}
private[this] def DEBUG(fmt: String, args: Any*) =