- def storeResourceEvent(event: ResourceEvent): Maybe[RecordID] =
- MongoDBStore.storeAquariumEvent(event, resourceEvents)
-
- def findResourceEventById(id: String): Maybe[ResourceEvent] =
- MongoDBStore.findById(id, resourceEvents, MongoDBStore.dbObjectToResourceEvent)
-
- def findResourceEventsByUserId(userId: String)
- (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
- val query = new BasicDBObject(ResourceJsonNames.userId, userId)
-
- MongoDBStore.runQuery(query, resourceEvents)(MongoDBStore.dbObjectToResourceEvent)(sortWith)
- }
-
- def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
- val query = new BasicDBObject()
- query.put(ResourceJsonNames.userId, userId)
- query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$gt", timestamp))
-
- val sort = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
-
- val cursor = resourceEvents.find(query).sort(sort)
-
- try {
- val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
+ def pingResourceEventStore(): Unit = synchronized {
+ getCollection(MongoDBStore.ResourceEventCollection)
+ MongoDBStore.ping(mongo)
+ }
+
+ def insertResourceEvent(event: ResourceEventMsg) = {
+ val mongoID = new ObjectId()
+ event.setInStoreID(mongoID.toStringMongod)
+
+ val dbObject = new BasicDBObjectBuilder().
+ add(MongoDBStore.JsonNames._id, mongoID).
+ add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
+ add(MongoDBStore.JsonNames.userID, event.getUserID).
+ add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
+ add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis).
+ get()
+
+ MongoDBStore.insertDBObject(dbObject, resourceEvents)
+ event
+ }
+
+ def findResourceEventByID(id: String): Option[ResourceEventMsg] = {
+ val dbObjectOpt = MongoDBStore.findOneByAttribute(resourceEvents, MongoDBStore.JsonNames.id, id)
+ for {
+ dbObject ← dbObjectOpt
+ payload = dbObject.get(MongoDBStore.JsonNames.payload)
+ msg = AvroHelpers.specificRecordOfBytes(payload.asInstanceOf[Array[Byte]], new ResourceEventMsg)
+ } yield msg
+ }
+
+ def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
+ val query = new BasicDBObjectBuilder().
+ add(MongoDBStore.JsonNames.userID, userID).
+ // received within the period
+ add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$gte", startMillis)).
+ add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$lte", stopMillis)).
+ // occurred outside the period
+ add("$or", {
+ val dbList = new BasicDBList()
+ dbList.add(0, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lt", startMillis)))
+ dbList.add(1, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gt", stopMillis)))
+ dbList
+ }).
+ get()
+
+ resourceEvents.count(query)
+ }
+
+ def foreachResourceEventOccurredInPeriod(
+ userID: String,
+ startMillis: Long,
+ stopMillis: Long
+ )(f: ResourceEventMsg ⇒ Unit): Long = {
+ var _counter= 0L
+ val query = new BasicDBObjectBuilder().
+ add(MongoDBStore.JsonNames.userID, userID).
+ add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gte", startMillis)).
+ add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lte", stopMillis)).
+ get()
+
+ val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1)
+ val cursor = resourceEvents.find(query).sort(sorter)
+
+ withCloseable(cursor) { cursor ⇒