ownedResourcesSnapshot: OwnedResourcesSnapshot,
newWalletEntries: List[NewWalletEntry],
- occurredMillis: Long, // The time fro which this state is relevant
+
+ occurredMillis: Long, // When this user state was computed
// The last known change reason for this userState
lastChangeReason: UserStateChangeReason = NoSpecificChangeReason(),
}
object JsonNames {
- final val _id = "_id"
+ final val _id = "_id"
final val userID = "userID"
+ final val isFullBillingMonthState = "isFullBillingMonthState"
+ final val occurredMillis = "occurredMillis"
+
+ object theFullBillingMonth {
+ final val year = "year"
+ final val month = "month"
+ final val monthStartMillis = "monthStartMillis"
+ final val monthStopMillis = "monthStopMillis"
+ }
}
def createInitialUserState(userID: String,
"findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
clog.begin()
- def doCompute: UserState = {
+ def computeFullMonthlyBilling(): UserState = {
doFullMonthlyBilling(
userStateBootstrap,
billingMonthInfo,
clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
clog.end()
- initialUserState1
- } else {
- // Ask DB cache for the latest known user state for this billing period
- val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
- userID,
- billingMonthInfo.year,
- billingMonthInfo.month)
-
- latestUserStateOpt match {
- case None ⇒
- // Not found, must compute
- clog.debug("No user state found from cache, will have to (re)compute")
- val result = doCompute
- clog.end()
- result
-
- case Some(latestUserState) ⇒
- // Found a "latest" user state but need to see if it is indeed the true and one latest.
- // For this reason, we must count the events again.
- val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
- val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
- userID,
- billingMonthStartMillis,
- billingMonthStopMillis)
-
- val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
- counterDiff match {
- // ZERO, we are OK!
- case 0 ⇒
- // NOTE: Keep the caller's calculation reason
- latestUserState.copyForChangeReason(calculationReason)
-
- // We had more, so must recompute
- case n if n > 0 ⇒
- clog.debug(
- "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
- val result = doCompute
- clog.end()
- result
-
- // We had less????
- case n if n < 0 ⇒
- val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
- clog.warn(errMsg)
- throw new AquariumException(errMsg)
- }
- }
+ return initialUserState1
+ }
+
+ // Ask DB cache for the latest known user state for this billing period
+ val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
+ userID,
+ billingMonthInfo.year,
+ billingMonthInfo.month)
+
+ latestUserStateOpt match {
+ case None ⇒
+ // Not found, must compute
+ clog.debug("No user state found from cache, will have to (re)compute")
+ val result = computeFullMonthlyBilling
+ clog.end()
+ result
+
+ case Some(latestUserState) ⇒
+ // Found a "latest" user state but need to see if it is indeed the true and one latest.
+ // For this reason, we must count the events again.
+ val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
+ val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
+ userID,
+ billingMonthStartMillis,
+ billingMonthStopMillis)
+
+ val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
+ counterDiff match {
+ // ZERO, we are OK!
+ case 0 ⇒
+ // NOTE: Keep the caller's calculation reason
+ latestUserState.copyForChangeReason(calculationReason)
+
+ // We had more, so must recompute
+ case n if n > 0 ⇒
+ clog.debug(
+ "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
+ val result = computeFullMonthlyBilling
+ clog.end()
+ result
+
+ // We had less????
+ case n if n < 0 ⇒
+ val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
+ clog.warn(errMsg)
+ throw new AquariumInternalError(errMsg)
+ }
}
}
}
def isOutOfSyncForBillingPeriod(billingStartMillis: Long, billingStopMillis: Long): Boolean = {
- isReceivedWithinMillis(billingStartMillis, billingStopMillis) &&
- (occurredMillis < billingStartMillis || occurredMillis > billingStopMillis)
+ // Out of sync events are those that were received within the billing period
+ // but actually occurred outside the billing period.
+ isReceivedWithinMillis(billingStartMillis, billingStopMillis) &&
+ !isOccurredWithinMillis(billingStartMillis, billingStopMillis)
}
def toDebugString(useOnlyInstanceId: Boolean = false): String = {
receivedMillis = ts, validFrom = ts)
config.policyStore.findPolicyEntry(newPolicy.id) match {
- case Just(x) =>
+ case Some(x) =>
logger.warn("Policy file contents not modified")
- case NoVal =>
+ case None =>
if (!pol.isEmpty) {
val toUpdate = pol.last.copy(validTo = ts - 1)
config.policyStore.updatePolicyEntry(toUpdate)
} else {
config.policyStore.storePolicyEntry(newPolicy)
}
- case failed @ Failed(e) =>
- failed.throwMe
}
}
/**
* Find a policy by its unique id
*/
- def findPolicyEntry(id: String): Maybe[PolicyEntry]
+ def findPolicyEntry(id: String): Option[PolicyEntry]
}
\ No newline at end of file
/**
* Count and return the number of "out of sync" events for a billing month.
*/
- def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long
+ def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long
/**
* Finds all relevant resource events for the billing period.
*/
def findUserStateByUserID(userID: String): Option[UserState]
- def findLatestUserStateByUserID(userID: String): Option[UserState]
-
/**
* Find the most up-to-date user state for the particular billing period.
*/
- def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Option[UserState]
+ def findLatestUserStateForEndOfBillingMonth(userID: String, yearOfBillingMonth: Int, billingMonth: Int): Option[UserState]
/**
* Delete a state for a user
_userStates.find(_.userID == userID)
}
- def findLatestUserStateByUserID(userID: String) = {
- val goodOnes = _userStates.filter(_.userID == userID)
-
- goodOnes.sortWith {
- case (us1, us2) ⇒
- us1.occurredMillis > us2.occurredMillis
- } match {
- case head :: _ ⇒
- Some(head)
- case _ ⇒
- None
- }
- }
-
def findLatestUserStateForEndOfBillingMonth(userID: String,
yearOfBillingMonth: Int,
billingMonth: Int): Option[UserState] = {
}.toList
}
- def countOutOfSyncEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
+ def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
_resourceEvents.filter { case ev ⇒
ev.userID == userID &&
// out of sync events are those that were received in the billing month but occurred in previous (or next?)
p :: acc
}
- def findPolicyEntry(id: String) = _policyEntries.find(p => p.id == id) match {
- case Some(x) => Just(x)
- case None => NoVal
+ def findPolicyEntry(id: String) = {
+ _policyEntries.find(p => p.id == id)
}
}
MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None)
}
- def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
- // FIXME: Implement
- 0L
+ def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
+ val query = new BasicDBObjectBuilder().
+ add(ResourceEventModel.Names.userID, userID).
+ // received within the period
+ add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$gte", startMillis)).
+ add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$lte", stopMillis)).
+ // occurred outside the period
+ add("$or", {
+ val dbList = new BasicDBList()
+ dbList.add(0, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$lt", startMillis)))
+ dbList.add(1, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$gt", stopMillis)))
+ dbList
+ }).
+ get()
+
+ resourceEvents.count(query)
}
def findAllRelevantResourceEventsForBillingPeriod(userId: String,
val query = new BasicDBObject(UserStateJsonNames.userID, userID)
val cursor = userStates find query
- withCloseable(cursor) { cursor ⇒
- if(cursor.hasNext)
- Some(MongoDBStore.dbObjectToUserState(cursor.next()))
- else
- None
- }
- }
-
-
- def findLatestUserStateByUserID(userID: String) = {
- // FIXME: implement
- null
+ MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
}
- def findLatestUserStateForEndOfBillingMonth(userId: String,
+ def findLatestUserStateForEndOfBillingMonth(userID: String,
yearOfBillingMonth: Int,
billingMonth: Int): Option[UserState] = {
- None // FIXME: implement
+ val query = new BasicDBObjectBuilder().
+ add(UserState.JsonNames.userID, userID).
+ add(UserState.JsonNames.isFullBillingMonthState, true).
+ add(UserState.JsonNames.theFullBillingMonth.year, yearOfBillingMonth).
+ add(UserState.JsonNames.theFullBillingMonth.month, billingMonth).
+ get()
+
+ // Descending order, so that the latest comes first
+ val sorter = new BasicDBObject(UserState.JsonNames.occurredMillis, -1)
+
+ val cursor = userStates.find(query).sort(sorter)
+
+ MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
}
def deleteUserState(userId: String) = {
// Normally one such event is allowed ...
val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
- withCloseable(cursor) { cursor ⇒
- if(cursor.hasNext) {
- Some(MongoDBIMEvent.fromDBObject(cursor.next()))
- } else {
- None
- }
- }
+ MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
}
def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
val query = new BasicDBObject(IMEventNames.userID, userID)
val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, -1))
- withCloseable(cursor) { cursor ⇒
- if(cursor.hasNext) {
- Some(MongoDBIMEvent.fromDBObject(cursor.next()))
- } else {
- None
- }
- }
-
+ MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
}
/**
UserState.fromJson(JSON.serialize(dbObj))
}
+ def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
+ withCloseable(cursor) { cursor ⇒
+ if(cursor.hasNext) {
+ Some(f(cursor.next()))
+ } else {
+ None
+ }
+ }
+ }
+
def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
PolicyEntry.fromJson(JSON.serialize(dbObj))
}