WIP Resource event handling
authorChristos KK Loverdos <loverdos@gmail.com>
Wed, 6 Jun 2012 15:28:53 +0000 (18:28 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Wed, 6 Jun 2012 15:28:53 +0000 (18:28 +0300)
src/main/scala/gr/grnet/aquarium/computation/UserState.scala
src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala
src/main/scala/gr/grnet/aquarium/event/model/resource/ResourceEventModel.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/Policy.scala
src/main/scala/gr/grnet/aquarium/store/PolicyStore.scala
src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala
src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala
src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala

index 4a5b56c..df3c431 100644 (file)
@@ -141,7 +141,8 @@ case class UserState(
     ownedResourcesSnapshot: OwnedResourcesSnapshot,
 
     newWalletEntries: List[NewWalletEntry],
-    occurredMillis: Long, // The time fro which this state is relevant
+
+    occurredMillis: Long, // When this user state was computed
 
     // The last known change reason for this userState
     lastChangeReason: UserStateChangeReason = NoSpecificChangeReason(),
@@ -223,8 +224,17 @@ object UserState {
   }
 
   object JsonNames {
-    final val _id = "_id"
+    final val _id    = "_id"
     final val userID = "userID"
+    final val isFullBillingMonthState = "isFullBillingMonthState"
+    final val occurredMillis = "occurredMillis"
+
+    object theFullBillingMonth {
+      final val year  = "year"
+      final val month = "month"
+      final val monthStartMillis = "monthStartMillis"
+      final val monthStopMillis  = "monthStopMillis"
+    }
   }
 
   def createInitialUserState(userID: String,
index a3a1d84..c5b5925 100644 (file)
@@ -72,7 +72,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString)
     clog.begin()
 
-    def doCompute: UserState = {
+    def computeFullMonthlyBilling(): UserState = {
       doFullMonthlyBilling(
         userStateBootstrap,
         billingMonthInfo,
@@ -98,53 +98,53 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
       clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
       clog.end()
 
-      initialUserState1
-    } else {
-      // Ask DB cache for the latest known user state for this billing period
-      val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
-        userID,
-        billingMonthInfo.year,
-        billingMonthInfo.month)
-
-      latestUserStateOpt match {
-        case None ⇒
-          // Not found, must compute
-          clog.debug("No user state found from cache, will have to (re)compute")
-          val result = doCompute
-          clog.end()
-          result
-
-        case Some(latestUserState) ⇒
-          // Found a "latest" user state but need to see if it is indeed the true and one latest.
-          // For this reason, we must count the events again.
-          val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
-          val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
-            userID,
-            billingMonthStartMillis,
-            billingMonthStopMillis)
-
-          val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
-          counterDiff match {
-            // ZERO, we are OK!
-            case 0 ⇒
-              // NOTE: Keep the caller's calculation reason
-              latestUserState.copyForChangeReason(calculationReason)
-
-            // We had more, so must recompute
-            case n if n > 0 ⇒
-              clog.debug(
-                "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
-              val result = doCompute
-              clog.end()
-              result
-
-            // We had less????
-            case n if n < 0 ⇒
-              val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
-              clog.warn(errMsg)
-              throw new AquariumException(errMsg)
-          }
-      }
+      return initialUserState1
+    }
+
+    // Ask DB cache for the latest known user state for this billing period
+    val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
+      userID,
+      billingMonthInfo.year,
+      billingMonthInfo.month)
+
+    latestUserStateOpt match {
+      case None ⇒
+        // Not found, must compute
+        clog.debug("No user state found from cache, will have to (re)compute")
+        val result = computeFullMonthlyBilling
+        clog.end()
+        result
+
+      case Some(latestUserState) ⇒
+        // Found a "latest" user state but need to see if it is indeed the true and one latest.
+        // For this reason, we must count the events again.
+        val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
+        val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod(
+          userID,
+          billingMonthStartMillis,
+          billingMonthStopMillis)
+
+        val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
+        counterDiff match {
+          // ZERO, we are OK!
+          case 0 ⇒
+            // NOTE: Keep the caller's calculation reason
+            latestUserState.copyForChangeReason(calculationReason)
+
+          // We had more, so must recompute
+          case n if n > 0 ⇒
+            clog.debug(
+              "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
+            val result = computeFullMonthlyBilling
+            clog.end()
+            result
+
+          // We had less????
+          case n if n < 0 ⇒
+            val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
+            clog.warn(errMsg)
+            throw new AquariumInternalError(errMsg)
+        }
     }
   }
 
index fb11aa7..af51ede 100644 (file)
@@ -104,8 +104,10 @@ trait ResourceEventModel extends ExternalEventModel {
   }
 
   def isOutOfSyncForBillingPeriod(billingStartMillis: Long, billingStopMillis: Long): Boolean = {
-    isReceivedWithinMillis(billingStartMillis, billingStopMillis) &&
-    (occurredMillis < billingStartMillis || occurredMillis > billingStopMillis)
+    // Out of sync events are those that were received within the billing period
+    // but actually occurred outside the billing period.
+     isReceivedWithinMillis(billingStartMillis, billingStopMillis) &&
+    !isOccurredWithinMillis(billingStartMillis, billingStopMillis)
   }
 
   def toDebugString(useOnlyInstanceId: Boolean = false): String = {
index e1c5143..f79afcb 100644 (file)
@@ -196,9 +196,9 @@ object Policy extends DSL with Loggable {
         receivedMillis = ts, validFrom = ts)
 
       config.policyStore.findPolicyEntry(newPolicy.id) match {
-        case Just(x) =>
+        case Some(x) =>
           logger.warn("Policy file contents not modified")
-        case NoVal =>
+        case None =>
           if (!pol.isEmpty) {
             val toUpdate = pol.last.copy(validTo = ts - 1)
             config.policyStore.updatePolicyEntry(toUpdate)
@@ -206,8 +206,6 @@ object Policy extends DSL with Loggable {
           } else {
             config.policyStore.storePolicyEntry(newPolicy)
           }
-        case failed @ Failed(e) =>
-          failed.throwMe
       }
     }
 
index f587271..572f6fa 100644 (file)
@@ -95,5 +95,5 @@ trait PolicyStore {
   /**
    * Find a policy by its unique id
    */
-  def findPolicyEntry(id: String): Maybe[PolicyEntry]
+  def findPolicyEntry(id: String): Option[PolicyEntry]
 }
\ No newline at end of file
index 787a657..f5da67c 100644 (file)
@@ -76,7 +76,7 @@ trait ResourceEventStore {
   /**
    * Count and return the number of "out of sync" events for a billing month.
    */
-  def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long
+  def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long
 
   /**
    * Finds all relevant resource events for the billing period.
index add6394..2ffd1e2 100644 (file)
@@ -57,12 +57,10 @@ trait UserStateStore {
    */
   def findUserStateByUserID(userID: String): Option[UserState]
 
-  def findLatestUserStateByUserID(userID: String): Option[UserState]
-
   /**
    * Find the most up-to-date user state for the particular billing period.
    */
-  def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Option[UserState]
+  def findLatestUserStateForEndOfBillingMonth(userID: String, yearOfBillingMonth: Int, billingMonth: Int): Option[UserState]
 
   /**
    * Delete a state for a user
index 26e4bb4..0795355 100644 (file)
@@ -111,20 +111,6 @@ class MemStore extends UserStateStore
     _userStates.find(_.userID == userID)
   }
 
-  def findLatestUserStateByUserID(userID: String) = {
-    val goodOnes = _userStates.filter(_.userID == userID)
-
-    goodOnes.sortWith {
-      case (us1, us2) ⇒
-        us1.occurredMillis > us2.occurredMillis
-    } match {
-      case head :: _ ⇒
-        Some(head)
-      case _ ⇒
-        None
-    }
-  }
-
   def findLatestUserStateForEndOfBillingMonth(userID: String,
                                               yearOfBillingMonth: Int,
                                               billingMonth: Int): Option[UserState] = {
@@ -229,7 +215,7 @@ class MemStore extends UserStateStore
     }.toList
   }
 
-  def countOutOfSyncEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
+  def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
     _resourceEvents.filter { case ev ⇒
       ev.userID == userID &&
       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
@@ -338,9 +324,8 @@ class MemStore extends UserStateStore
           p :: acc
   }
 
-  def findPolicyEntry(id: String) = _policyEntries.find(p => p.id == id) match {
-    case Some(x) => Just(x)
-    case None => NoVal
+  def findPolicyEntry(id: String) = {
+    _policyEntries.find(p => p.id == id)
   }
 }
 
index 0b8d209..617471f 100644 (file)
@@ -184,9 +184,22 @@ class MongoDBStore(
     MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None)
   }
   
-  def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
-    // FIXME: Implement
-    0L
+  def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
+    val query = new BasicDBObjectBuilder().
+      add(ResourceEventModel.Names.userID, userID).
+      // received within the period
+      add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$gte", startMillis)).
+      add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$lte", stopMillis)).
+      // occurred outside the period
+      add("$or", {
+        val dbList = new BasicDBList()
+        dbList.add(0, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$lt", startMillis)))
+        dbList.add(1, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$gt", stopMillis)))
+        dbList
+      }).
+      get()
+
+    resourceEvents.count(query)
   }
 
   def findAllRelevantResourceEventsForBillingPeriod(userId: String,
@@ -210,24 +223,25 @@ class MongoDBStore(
     val query = new BasicDBObject(UserStateJsonNames.userID, userID)
     val cursor = userStates find query
 
-    withCloseable(cursor) { cursor ⇒
-      if(cursor.hasNext)
-        Some(MongoDBStore.dbObjectToUserState(cursor.next()))
-      else
-        None
-    }
-  }
-
-
-  def findLatestUserStateByUserID(userID: String) = {
-    // FIXME: implement
-    null
+    MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
   }
 
-  def findLatestUserStateForEndOfBillingMonth(userId: String,
+  def findLatestUserStateForEndOfBillingMonth(userID: String,
                                               yearOfBillingMonth: Int,
                                               billingMonth: Int): Option[UserState] = {
-    None // FIXME: implement
+    val query = new BasicDBObjectBuilder().
+      add(UserState.JsonNames.userID, userID).
+      add(UserState.JsonNames.isFullBillingMonthState, true).
+      add(UserState.JsonNames.theFullBillingMonth.year, yearOfBillingMonth).
+      add(UserState.JsonNames.theFullBillingMonth.month, billingMonth).
+      get()
+
+    // Descending order, so that the latest comes first
+    val sorter = new BasicDBObject(UserState.JsonNames.occurredMillis, -1)
+
+    val cursor = userStates.find(query).sort(sorter)
+
+    MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
   }
 
   def deleteUserState(userId: String) = {
@@ -271,27 +285,14 @@ class MongoDBStore(
     // Normally one such event is allowed ...
     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
 
-    withCloseable(cursor) { cursor ⇒
-      if(cursor.hasNext) {
-        Some(MongoDBIMEvent.fromDBObject(cursor.next()))
-      } else {
-        None
-      }
-    }
+    MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
   }
 
   def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
     val query = new BasicDBObject(IMEventNames.userID, userID)
     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, -1))
 
-    withCloseable(cursor) { cursor ⇒
-      if(cursor.hasNext) {
-        Some(MongoDBIMEvent.fromDBObject(cursor.next()))
-      } else {
-        None
-      }
-    }
-
+    MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
   }
 
   /**
@@ -408,6 +409,16 @@ object MongoDBStore {
     UserState.fromJson(JSON.serialize(dbObj))
   }
 
+  def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
+    withCloseable(cursor) { cursor ⇒
+      if(cursor.hasNext) {
+        Some(f(cursor.next()))
+      } else {
+        None
+      }
+    }
+  }
+
   def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
     PolicyEntry.fromJson(JSON.serialize(dbObj))
   }