private[this] def processResourceEventsAfterLastKnownUserState() {
// Update the user state snapshot with fresh (ie not previously processed) events.
-
}
private[this] def makeUserStateMsgUpToDate() {
*/
def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long
+ /**
+ *
+ * @param userID
+ * @param startMillis
+ * @param stopMillis
+ * @param f
+ * @return The number of resource events processed.
+ */
def foreachResourceEventOccurredInPeriod(
userID: String,
startMillis: Long,
stopMillis: Long
- )(f: ResourceEventMsg ⇒ Unit): Unit
+ )(f: ResourceEventMsg ⇒ Unit): Long
}
\ No newline at end of file
userID: String,
startMillis: Long,
stopMillis: Long
- )(f: ResourceEventMsg ⇒ Unit): Unit = {
+ )(f: ResourceEventMsg ⇒ Unit): Long = {
+ var _counter= 0L
_resourceEvents.filter { case ev ⇒
ev.getUserID == userID &&
MessageHelpers.isOccurredWithinMillis(ev, startMillis, stopMillis)
- }.foreach(f)
+ } foreach { rcEvent ⇒
+ f(rcEvent)
+ _counter += 1
+ }
+
+ _counter
}
//+ IMEventStore
userID: String,
startMillis: Long,
stopMillis: Long
- )(f: ResourceEventMsg ⇒ Unit): Unit = {
-
+ )(f: ResourceEventMsg ⇒ Unit): Long = {
+ var _counter= 0L
val query = new BasicDBObjectBuilder().
add(MongoDBStore.JsonNames.userID, userID).
add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gte", startMillis)).
val nextEvent = AvroHelpers.specificRecordOfBytes(payload, new ResourceEventMsg)
f(nextEvent)
+ _counter += 1
}
}
+
+ _counter
}
//-ResourceEventStore