- def findLatestUserWalletEntries(userId: String) = {
- Maybe {
- val orderBy = new BasicDBObject(JsonNames.occurredMillis, -1) // -1 is descending order
- val cursor = wallets.find().sort(orderBy)
-
- try {
- val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
- if(cursor.hasNext) {
- val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
- buffer += walletEntry
-
- var _previousOccurredMillis = walletEntry.occurredMillis
- var _ok = true
-
- while(cursor.hasNext && _ok) {
- val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
- var currentOccurredMillis = walletEntry.occurredMillis
- _ok = currentOccurredMillis == _previousOccurredMillis
-
- if(_ok) {
- buffer += walletEntry
- }
- }
-
- buffer.toList
- } else {
- null
- }
- } finally {
- cursor.close()
- }
+ /**
+ * Stores a user state.
+ */
+ def insertUserState(event: UserStateMsg)= {
+ val mongoID = new ObjectId()
+ event.setInStoreID(mongoID.toStringMongod)
+
+ val dbObject = new BasicDBObjectBuilder().
+ add(MongoDBStore.JsonNames._id, mongoID).
+ add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
+ add(MongoDBStore.JsonNames.userID, event.getUserID).
+ add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
+ add(MongoDBStore.JsonNames.isForFullMonth, event.getIsForFullMonth).
+ add(MongoDBStore.JsonNames.billingYear, event.getBillingYear).
+ add(MongoDBStore.JsonNames.billingMonth, event.getBillingMonth).
+ add(MongoDBStore.JsonNames.billingMonthDay, event.getBillingMonthDay).
+ get()
+
+ MongoDBStore.insertDBObject(dbObject, userStates)
+ event
+ }
+ //- UserStateStore
+
+ //+IMEventStore
+ def pingIMEventStore(): Unit = {
+ getCollection(MongoDBStore.IMEventCollection)
+ MongoDBStore.ping(mongo)
+ }
+
+ def insertIMEvent(event: IMEventMsg) = {
+ val mongoID = new ObjectId()
+ event.setInStoreID(mongoID.toStringMongod)
+
+ val dbObject = new BasicDBObjectBuilder().
+ add(MongoDBStore.JsonNames._id, mongoID).
+ add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
+ add(MongoDBStore.JsonNames.userID, event.getUserID).
+ add(MongoDBStore.JsonNames.eventType, event.getEventType().toLowerCase).
+ add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
+ add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis).
+ get()
+
+ MongoDBStore.insertDBObject(dbObject, imEvents)
+ event
+ }
+
+ def findIMEventByID(id: String) = {
+ val dbObjectOpt = MongoDBStore.findOneByAttribute(imEvents, MongoDBStore.JsonNames.id, id)
+ for {
+ dbObject ← dbObjectOpt
+ payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
+ msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
+ } yield {
+ msg