Merge branch 'snapshots'
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
index 3aa450d..1433e48 100644 (file)
@@ -41,8 +41,8 @@ import gr.grnet.aquarium.computation.BillingMonthInfo
 import gr.grnet.aquarium.converter.StdConverters
 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
 import gr.grnet.aquarium.message.MessageConstants
-import gr.grnet.aquarium.message.avro.gen.{UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg}
-import gr.grnet.aquarium.message.avro.{MessageFactory, OrderingHelpers, AvroHelpers}
+import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, OrderingHelpers, AvroHelpers}
 import gr.grnet.aquarium.store._
 import gr.grnet.aquarium.util._
 import gr.grnet.aquarium.util.Once
@@ -172,8 +172,8 @@ class MongoDBStore(
       userID: String,
       startMillis: Long,
       stopMillis: Long
-  )(f: ResourceEventMsg ⇒ Unit): Unit = {
-
+  )(f: ResourceEventMsg ⇒ Unit): Long = {
+    var _counter= 0L
     val query = new BasicDBObjectBuilder().
       add(MongoDBStore.JsonNames.userID, userID).
       add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gte", startMillis)).
@@ -190,8 +190,11 @@ class MongoDBStore(
         val nextEvent = AvroHelpers.specificRecordOfBytes(payload, new ResourceEventMsg)
 
         f(nextEvent)
+        _counter += 1
       }
     }
+
+    _counter
   }
   //-ResourceEventStore
 
@@ -210,7 +213,7 @@ class MongoDBStore(
   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo) = {
     val query = new BasicDBObjectBuilder().
       add(MongoDBStore.JsonNames.userID, userID).
-      add(MongoDBStore.JsonNames.isFullBillingMonth, true).
+      add(MongoDBStore.JsonNames.isForFullMonth, true).
       add(MongoDBStore.JsonNames.billingYear, bmi.year).
       add(MongoDBStore.JsonNames.billingMonth, bmi.month).
       get()
@@ -225,6 +228,21 @@ class MongoDBStore(
     }
   }
 
+  def findLatestUserState(userID: String) = {
+    val query = new BasicDBObjectBuilder().
+      add(MongoDBStore.JsonNames.userID, userID).
+      get()
+
+    // Descending order, so that the latest comes first
+    val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1)
+
+    val cursor = userStates.find(query).sort(sorter)
+
+    withCloseable(cursor) { cursor ⇒
+      MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg)
+    }
+  }
+
   /**
    * Stores a user state.
    */
@@ -237,7 +255,7 @@ class MongoDBStore(
       add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
       add(MongoDBStore.JsonNames.userID, event.getUserID).
       add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
-      add(MongoDBStore.JsonNames.isFullBillingMonth, event.getIsFullBillingMonth).
+      add(MongoDBStore.JsonNames.isForFullMonth, event.getIsForFullMonth).
       add(MongoDBStore.JsonNames.billingYear, event.getBillingYear).
       add(MongoDBStore.JsonNames.billingMonth, event.getBillingMonth).
       add(MongoDBStore.JsonNames.billingMonthDay, event.getBillingMonthDay).
@@ -317,24 +335,25 @@ class MongoDBStore(
    *
    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
    */
-  def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Unit) = {
+  def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = {
     val query = new BasicDBObject(MongoDBStore.JsonNames.userID, userID)
     val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1))
 
+    var _shouldContinue = true
     withCloseable(cursor) { cursor ⇒
-      while(cursor.hasNext) {
+      while(_shouldContinue && cursor.hasNext) {
         val dbObject = cursor.next()
         val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
         val msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
 
-        f(msg)
+        _shouldContinue = f(msg)
       }
     }
+
+    _shouldContinue
   }
   //-IMEventStore
 
-
-
   //+PolicyStore
   def foreachPolicy[U](f: PolicyMsg ⇒ U) {
     val cursor = policies.find()