WIP Resource event handling and API cleanup
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 11 Jun 2012 15:38:40 +0000 (18:38 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 11 Jun 2012 15:38:40 +0000 (18:38 +0300)
12 files changed:
src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala
src/main/scala/gr/grnet/aquarium/computation/reason/UserStateChangeReason.scala
src/main/scala/gr/grnet/aquarium/computation/state/UserState.scala
src/main/scala/gr/grnet/aquarium/connector/handler/ResourceEventPayloadHandler.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/dsl/DSLCostPolicy.scala
src/main/scala/gr/grnet/aquarium/simulation/UserSim.scala
src/main/scala/gr/grnet/aquarium/store/IMEventStore.scala
src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala
src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala

index 3c76358..deb14b8 100644 (file)
@@ -227,7 +227,7 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable {
 
         case ResourceEventPath(id) ⇒
           withAdminCookieHelper { responder ⇒
-            eventInfoResponse(uri, responder, aquarium.resourceEventStore.findResourceEventById, id)
+            eventInfoResponse(uri, responder, aquarium.resourceEventStore.findResourceEventByID, id)
           }
 
         case IMEventPath(id) ⇒
index 7326267..0f20383 100644 (file)
@@ -137,7 +137,7 @@ class UserActor extends ReflectiveRoleableActor {
     var _roleCheck = None: Option[String]
 
     // this._userID is already set up
-    store.replayIMEventsInOccurrenceOrder(this._userID) { imEvent ⇒
+    store.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
       DEBUG("Replaying %s", imEvent)
 
       val (creationTimeChanged, activationTimeChanged, roleChanged) = _updateIMStateRoleHistory(imEvent, _roleCheck)
@@ -288,23 +288,23 @@ class UserActor extends ReflectiveRoleableActor {
     val rcEvent = event.rcEvent
 
     if(!shouldProcessResourceEvents) {
-      // This means the user has not been activated. So, we do not process any resource event
+      // This means the user has not been created (at least, as far as Aquarium is concerned).
+      // So, we do not process any resource event
       DEBUG("Not processing %s", rcEvent.toJsonString)
       logSeparator()
 
       return
     }
 
-    this._userState.findLatestResourceEventID match {
-      case Some(id) ⇒
-        if(id == rcEvent.id) {
-          INFO("Ignoring first %s just after %s birth", rcEvent.toDebugString, shortClassNameOf(this))
-          logSeparator()
-
-          return
-        }
+    // Since the latest resource event per resource is recorded in the user state,
+    // we do not need to query the store. Just query the in-memory state.
+    // Note: This is a similar situation with the first IMEvent received right after the user
+    //       actor is created.
+    if(this._userState.isLatestResourceEventIDEqualTo(rcEvent.id)) {
+      INFO("Ignoring first %s just after %s birth", rcEvent.toDebugString, shortClassNameOf(this))
+      logSeparator()
 
-      case _ ⇒
+      return
     }
 
     val now = TimeHelpers.nowMillis()
@@ -341,7 +341,9 @@ class UserActor extends ReflectiveRoleableActor {
 
     this._userState = aquarium.userStateComputations.doMonthBillingUpTo(
       billingMonthInfo,
-      now max eventOccurredMillis, // take into account that the event may be out-of-sync
+      // Take into account that the event may be out-of-sync.
+      // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
+      now max eventOccurredMillis,
       userStateBootstrap,
       currentResourcesMap,
       calculationReason,
index d9b85ab..56205dc 100644 (file)
@@ -183,7 +183,7 @@ final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
       currentResourceEvent: ResourceEventModel,
       stateChangeReason: UserStateChangeReason,
       billingMonthInfo: BillingMonthInfo,
-      walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
+      walletEntryRecorder: NewWalletEntry ⇒ Unit,
       clogOpt: Option[ContextualLogger] = None
   ): UserState = {
 
@@ -194,6 +194,7 @@ final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
     val theResource = currentResourceEvent.safeResource
     val theInstanceId = currentResourceEvent.safeInstanceId
     val theValue = currentResourceEvent.value
+    val theDetails = currentResourceEvent.details
 
     val resourcesMap = userStateWorker.resourcesMap
 
@@ -234,9 +235,9 @@ final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
             val oldCredits = _workingUserState.totalCredits
 
             // A. Compute new resource instance accumulating amount
-            val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
+            val newAccumulatingAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue, theDetails)
 
-            clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
+            clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAccumulatingAmount, oldCredits)
 
             // B. Compute new wallet entries
             clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
@@ -248,7 +249,7 @@ final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
               currentResourceEvent,
               oldCredits,
               oldAmount,
-              newAmount,
+              newAccumulatingAmount,
               dslResource,
               resourcesMap,
               alltimeAgreements,
@@ -288,7 +289,7 @@ final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
               )
               clog.debug("New %s", newWalletEntry)
 
-              walletEntriesBuffer += newWalletEntry
+              walletEntryRecorder.apply(newWalletEntry)
             } else {
               clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
             }
@@ -324,7 +325,7 @@ final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
       userStateWorker: UserStateWorker,
       stateChangeReason: UserStateChangeReason,
       billingMonthInfo: BillingMonthInfo,
-      walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
+      walletEntryRecorder: NewWalletEntry ⇒ Unit,
       clogOpt: Option[ContextualLogger] = None
   ): UserState = {
 
@@ -338,7 +339,7 @@ final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
         currentResourceEvent,
         stateChangeReason,
         billingMonthInfo,
-        walletEntriesBuffer,
+        walletEntryRecorder,
         clogOpt
       )
     }
@@ -391,19 +392,24 @@ final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
       "doMonthBillingUpTo(%s)", new MutableDateCalc(billingEndTimeMillis).toYYYYMMDDHHMMSSSSS)
     clog.begin()
 
-    clog.debug("calculationReason = %s", calculationReason)
+    clog.debug("%s", calculationReason)
 
     val clogSome = Some(clog)
 
+    val previousBillingMonthInfo = billingMonthInfo.previousMonth
     val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
       userStateBootstrap,
-      billingMonthInfo.previousMonth,
+      previousBillingMonthInfo,
       defaultResourcesMap,
       calculationReason,
       storeFunc,
       clogSome
     )
 
+    clog.debug("previousBillingMonthUserState(%s) = %s".format(
+      previousBillingMonthInfo.toShortDebugString,
+      previousBillingMonthUserState.toJsonString))
+
     val startingUserState = previousBillingMonthUserState
 
     // Keep the working (current) user state. This will get updated as we proceed with billing for the month
@@ -416,23 +422,34 @@ final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
 
     // First, find and process the actual resource events from DB
-    val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
+    val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
+    val walletEntryRecorder = (nwe: NewWalletEntry) ⇒ {
+      newWalletEntries.append(nwe)
+    }
+
+    var _rcEventsCounter = 0
+    resourceEventStore.foreachResourceEventOccurredInPeriod(
       userID,
       billingMonthInfo.monthStartMillis, // from start of month
       billingEndTimeMillis               // to requested time
-    )
+    ) { currentResourceEvent ⇒
 
-    val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
+      clog.debug("Processing %s".format(currentResourceEvent))
 
-    _workingUserState = processResourceEvents(
-      allResourceEventsForMonth,
-      _workingUserState,
-      userStateWorker,
-      calculationReason,
-      billingMonthInfo,
-      newWalletEntries,
-      clogSome
-    )
+      _workingUserState = processResourceEvent(
+        _workingUserState,
+        userStateWorker,
+        currentResourceEvent,
+        calculationReason,
+        billingMonthInfo,
+        walletEntryRecorder,
+        clogSome
+      )
+
+      _rcEventsCounter += 1
+    }
+
+    clog.debug("Found %s resource events for month %s".format(_rcEventsCounter, billingMonthInfo.toShortDebugString))
 
     if(isFullMonthBilling) {
       // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
@@ -463,7 +480,7 @@ final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
         specialUserStateWorker,
         calculationReason,
         billingMonthInfo,
-        newWalletEntries,
+        walletEntryRecorder,
         clogSome
       )
     }
index 0f9d072..47b87fc 100644 (file)
@@ -38,6 +38,7 @@ package gr.grnet.aquarium.computation.reason
 import gr.grnet.aquarium.computation.BillingMonthInfo
 import gr.grnet.aquarium.event.model.im.IMEventModel
 import gr.grnet.aquarium.util.shortClassNameOf
+import gr.grnet.aquarium.util.json.JsonSupport
 
 /**
  * Provides information explaining the reason Aquarium calculated a new
@@ -47,7 +48,7 @@ case class UserStateChangeReason(
     details: Map[String, String],
     billingMonthInfo: Option[BillingMonthInfo],
     parentReason: Option[UserStateChangeReason]
-) {
+) extends JsonSupport {
 
   require(
     details.contains(UserStateChangeReason.Names.name),
index e84a23b..8ffe17c 100644 (file)
@@ -219,6 +219,10 @@ case class UserState(
     latestResourceEventsSnapshot.findTheLatestID
   }
 
+  def isLatestResourceEventIDEqualTo(toCheckID: String) = {
+    findLatestResourceEventID.map(_ == toCheckID).getOrElse(false)
+  }
+
   //  def toShortString = "UserState(%s, %s, %s, %s, %s)".format(
   //    userId,
   //    _id,
index 421daf9..7ad47c3 100644 (file)
@@ -105,7 +105,7 @@ class ResourceEventPayloadHandler(aquarium: Aquarium, logger: Logger)
         // 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
         //    It is a requirement that this ID is unique.
         val store = aquarium.resourceEventStore
-        store.findResourceEventById(id) match {
+        store.findResourceEventByID(id) match {
           case Some(_) ⇒
             // Reject the duplicate
             Some(HandlerResultReject("Duplicate %s with id = %s".format(className, id)))
index fadf433..70a0b4d 100644 (file)
@@ -105,17 +105,21 @@ abstract class DSLCostPolicy(val name: String, val vars: Set[DSLCostPolicyVar])
   }
 
   /**
-   * Given the old amount of a resource instance (see [[gr.grnet.aquarium.computation.data.ResourceInstanceSnapshot]]) and the
-   * value arriving in a new resource event, compute the new instance amount.
+   * Given the old amount of a resource instance
+   * (see [[gr.grnet.aquarium.computation.state.parts.ResourceInstanceSnapshot]]), the
+   * value arriving in a new resource event and the new details, compute the new instance amount.
    *
    * Note that the `oldAmount` does not make sense for all types of [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]],
    * in which case it is ignored.
    *
-   * @param oldAmount the old accumulating amount
-   * @param newEventValue the value contained in a newly arrived [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]
+   * @param oldAmount     the old accumulating amount
+   * @param newEventValue the value contained in a newly arrived
+   *                      [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]
+   * @param details       the `details` of the newly arrived
+   *                      [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]
    * @return
    */
-  def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double): Double
+  def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double, details: Map[String, String]): Double
 
   def computeResourceInstanceAmountForNewBillingPeriod(oldAmount: Double): Double
 
@@ -237,7 +241,9 @@ case object OnceCostPolicy
 
   def isBillableFirstEventBasedOnValue(eventValue: Double) = true
 
-  def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double) = oldAmount
+  def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double, details: Map[String, String]) = {
+    oldAmount
+  }
 
   def computeResourceInstanceAmountForNewBillingPeriod(oldAmount: Double) = getResourceInstanceInitialAmount
 
@@ -269,8 +275,15 @@ case object ContinuousCostPolicy
   extends DSLCostPolicy(DSLCostPolicyNames.continuous,
                         Set(DSLCostPolicyNameVar, DSLUnitPriceVar, DSLOldTotalAmountVar, DSLTimeDeltaVar)) {
 
-  def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double): Double = {
-    oldAmount + newEventValue
+  def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double, details: Map[String, String]): Double = {
+    // If the total is in the details, get it, or else compute it
+    details.get("total") match {
+      case Some(total) ⇒
+        total.toDouble
+
+      case _ ⇒
+        oldAmount + newEventValue
+    }
   }
 
   def computeResourceInstanceAmountForNewBillingPeriod(oldAmount: Double): Double = {
@@ -331,7 +344,7 @@ case object OnOffCostPolicy
    * @param newEventValue
    * @return
    */
-  def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double): Double = {
+  def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double, details: Map[String, String]): Double = {
     newEventValue
   }
   
@@ -433,7 +446,7 @@ object OnOffCostPolicyValues {
 case object DiscreteCostPolicy extends DSLCostPolicy(DSLCostPolicyNames.discrete,
                                                      Set(DSLCostPolicyNameVar, DSLUnitPriceVar, DSLCurrentValueVar)) {
 
-  def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double): Double = {
+  def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double, details: Map[String, String]): Double = {
     oldAmount + newEventValue
   }
 
index c369714..8cb6bb5 100644 (file)
@@ -54,7 +54,7 @@ case class UserSim(userID: String, userCreationDate: Date, aquarium: AquariumSim
   }
 
   def myResourceEvents: List[ResourceEventModel] = {
-    resourceEventStore.findResourceEventsByUserId(userID)(None)
+    resourceEventStore.findResourceEventsByUserID(userID)(None)
   }
 
   def myResourceEventsByReceivedDate: List[ResourceEventModel] = {
index 231049e..74ca318 100644 (file)
@@ -76,16 +76,10 @@ trait IMEventStore {
   def findLatestIMEventByUserID(userID: String): Option[IMEvent]
 
   /**
-   * Find the very first activation event for a particular user.
-   *
-   */
-  def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent]
-
-  /**
    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
    * the given function `f`.
    *
    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
    */
-  def replayIMEventsInOccurrenceOrder(userID: String)(f: IMEvent ⇒ Unit): Unit
+  def foreachIMEventInOccurrenceOrder(userID: String)(f: IMEvent ⇒ Unit): Unit
 }
\ No newline at end of file
index f5da67c..99f6d2f 100644 (file)
@@ -35,8 +35,8 @@
 
 package gr.grnet.aquarium.store
 
-import gr.grnet.aquarium.AquariumException
 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
+import gr.grnet.aquarium.AquariumInternalError
 
 /**
  * An abstraction for Aquarium `ResourceEvent` stores.
@@ -50,44 +50,27 @@ trait ResourceEventStore {
   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent
 
   def clearResourceEvents(): Unit = {
-    throw new AquariumException("Unsupported operation")
+    // This method is implemented only in MemStore.
+    throw new AquariumInternalError("Unsupported operation")
   }
 
   def pingResourceEventStore(): Unit
 
   def insertResourceEvent(event: ResourceEventModel): ResourceEvent
 
-  def findResourceEventById(id: String): Option[ResourceEvent]
+  def findResourceEventByID(id: String): Option[ResourceEvent]
 
-  def findResourceEventsByUserId(userId: String)(sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent]
+  def findResourceEventsByUserID(userID: String)(sortWith: Option[(ResourceEvent, ResourceEvent) ⇒ Boolean]): List[ResourceEvent]
 
   /**
-   * Returns the events for the given User after (or equal) the given timestamp.
-   *
-   * The events are returned in ascending timestamp order.
-   */
-  def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent]
-  
-  def findResourceEventHistory(userId: String, resName: String,
-                               instid: Option[String], upTo: Long) : List[ResourceEvent]
-
-  def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent]
-
-  /**
-   * Count and return the number of "out of sync" events for a billing month.
+   * Counts and returns the number of "out of sync" events for a billing period.
+   * Note the we assume billing months for now. So, do not cross the month border in the provided time-stamps.
    */
   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long
 
-  /**
-   * Finds all relevant resource events for the billing period.
-   * The relevant events are those:
-   * a) whose `occurredMillis` is within the given billing period or
-   * b) whose `receivedMillis` is within the given billing period.
-   *
-   * Order them by `occurredMillis`
-   * FIXME: implement
-   */
-  def findAllRelevantResourceEventsForBillingPeriod(userId: String,
-                                                    startMillis: Long,
-                                                    stopMillis: Long): List[ResourceEvent]
+  def foreachResourceEventOccurredInPeriod(
+      userID: String,
+      startMillis: Long,
+      stopMillis: Long
+  )(f: ResourceEvent ⇒ Unit): Unit
 }
\ No newline at end of file
index f61ade6..5744876 100644 (file)
@@ -167,11 +167,11 @@ class MemStore extends UserStateStore
     localEvent
   }
 
-  def findResourceEventById(id: String) = {
+  def findResourceEventByID(id: String) = {
     _resourceEvents.find(ev ⇒ ev.id == id)
   }
 
-  def findResourceEventsByUserId(userId: String)
+  def findResourceEventsByUserID(userId: String)
                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
     val byUserId = _resourceEvents.filter(_.userID == userId).toArray
     val sorted = sortWith match {
@@ -184,29 +184,6 @@ class MemStore extends UserStateStore
     sorted.toList
   }
 
-  def findResourceEventsByUserIdAfterTimestamp(userID: String, timestamp: Long): List[ResourceEvent] = {
-    _resourceEvents.filter { ev ⇒
-      ev.userID == userID &&
-      (ev.occurredMillis > timestamp)
-    }.toList
-  }
-
-  def findResourceEventHistory(userId: String,
-                               resName: String,
-                               instid: Option[String],
-                               upTo: Long): List[ResourceEvent] = {
-    Nil
-  }
-
-  def findResourceEventsForReceivedPeriod(userID: String,
-                                          startTimeMillis: Long,
-                                          stopTimeMillis: Long): List[ResourceEvent] = {
-    _resourceEvents.filter { ev ⇒
-      ev.userID == userID &&
-      ev.isReceivedWithinMillis(startTimeMillis, stopTimeMillis)
-    }.toList
-  }
-
   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
     _resourceEvents.filter { case ev ⇒
       ev.userID == userID &&
@@ -215,24 +192,18 @@ class MemStore extends UserStateStore
       ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
     }.size.toLong
   }
+  //- ResourceEventStore
 
-  /**
-   * Finds all relevant resource events for the billing period.
-   * The relevant events are those:
-   * a) whose `occurredMillis` is within the given billing period or
-   * b) whose `receivedMillis` is within the given billing period.
-   *
-   * Order them by `occurredMillis`
-   */
-  override def findAllRelevantResourceEventsForBillingPeriod(userID: String,
-                                                             startMillis: Long,
-                                                             stopMillis: Long): List[ResourceEvent] = {
+  def foreachResourceEventOccurredInPeriod(
+      userID: String,
+      startMillis: Long,
+      stopMillis: Long
+  )(f: ResourceEvent ⇒ Unit): Unit = {
     _resourceEvents.filter { case ev ⇒
       ev.userID == userID &&
-      ev.isOccurredOrReceivedWithinMillis(startMillis, stopMillis)
-    }.toList sortWith { case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis }
+      ev.isOccurredWithinMillis(startMillis, stopMillis)
+    }.foreach(f)
   }
-  //- ResourceEventStore
 
   //+ IMEventStore
   def createIMEventFromJson(json: String) = {
@@ -274,27 +245,13 @@ class MemStore extends UserStateStore
     } headOption
   }
 
-  def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent] = {
-    imEventById.valuesIterator.filter { case ev ⇒
-      ev.userID == userID && ev.isActive
-    }.toList.sortWith { case (ev1, ev2) ⇒
-      ev1.occurredMillis <= ev2.occurredMillis
-    } match {
-      case head :: _ ⇒
-        Some(head)
-
-      case _ ⇒
-        None
-    }
-  }
-
   /**
    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
    * the given function `f`.
    *
    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
    */
-  def replayIMEventsInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
+  def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
     imEventById.valuesIterator.filter(_.userID == userID).toSeq.sortWith {
       case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis
     } foreach(f)
index 458e5fe..cf42f6e 100644 (file)
@@ -112,80 +112,17 @@ class MongoDBStore(
     localEvent
   }
 
-  def findResourceEventById(id: String): Option[ResourceEvent] = {
+  def findResourceEventByID(id: String): Option[ResourceEvent] = {
     MongoDBStore.findBy(ResourceEventNames.id, id, resourceEvents, MongoDBResourceEvent.fromDBObject)
   }
 
-  def findResourceEventsByUserId(userId: String)
+  def findResourceEventsByUserID(userId: String)
                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
     val query = new BasicDBObject(ResourceEventNames.userID, userId)
 
     MongoDBStore.runQuery(query, resourceEvents)(MongoDBResourceEvent.fromDBObject)(sortWith)
   }
 
-  def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
-    val query = new BasicDBObject()
-    query.put(ResourceEventNames.userID, userId)
-    query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$gt", timestamp))
-    
-    val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
-
-    val cursor = resourceEvents.find(query).sort(sort)
-
-    try {
-      val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
-      while(cursor.hasNext) {
-        buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
-      }
-      buffer.toList.sortWith(_sortByTimestampAsc)
-    } finally {
-      cursor.close()
-    }
-  }
-
-  def findResourceEventHistory(userId: String, resName: String,
-                               instid: Option[String], upTo: Long) : List[ResourceEvent] = {
-    val query = new BasicDBObject()
-    query.put(ResourceEventNames.userID, userId)
-    query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$lt", upTo))
-    query.put(ResourceEventNames.resource, resName)
-
-    instid match {
-      case Some(id) =>
-        Policy.policy.findResource(resName) match {
-          case Some(y) => query.put(ResourceEventNames.details,
-            new BasicDBObject(y.descriminatorField, instid.get))
-          case None =>
-        }
-      case None =>
-    }
-
-    val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
-    val cursor = resourceEvents.find(query).sort(sort)
-
-    try {
-      val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
-      while(cursor.hasNext) {
-        buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
-      }
-      buffer.toList.sortWith(_sortByTimestampAsc)
-    } finally {
-      cursor.close()
-    }
-  }
-
-  def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = {
-    val query = new BasicDBObject()
-    query.put(ResourceEventNames.userID, userId)
-    query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis))
-    query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis))
-
-    // Sort them by increasing order for occurred time
-    val orderBy = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
-
-    MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None)
-  }
-  
   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
     val query = new BasicDBObjectBuilder().
       add(ResourceEventModel.Names.userID, userID).
@@ -204,11 +141,29 @@ class MongoDBStore(
     resourceEvents.count(query)
   }
 
-  def findAllRelevantResourceEventsForBillingPeriod(userId: String,
-                                                    startMillis: Long,
-                                                    stopMillis: Long): List[ResourceEvent] = {
-    // FIXME: Implement
-    Nil
+  def foreachResourceEventOccurredInPeriod(
+      userID: String,
+      startMillis: Long,
+      stopMillis: Long
+  )(f: ResourceEvent ⇒ Unit): Unit = {
+
+    val query = new BasicDBObjectBuilder().
+      add(ResourceEventModel.Names.userID, userID).
+      add(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$gte", startMillis)).
+      add(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$lte", stopMillis)).
+      get()
+
+    val sorter = new BasicDBObject(ResourceEventModel.Names.occurredMillis, 1)
+    val cursor = resourceEvents.find(query).sort(sorter)
+
+    withCloseable(cursor) { cursor ⇒
+      while(cursor.hasNext) {
+        val nextDBObject = cursor.next()
+        val nextEvent = MongoDBResourceEvent.fromDBObject(nextDBObject)
+
+        f(nextEvent)
+      }
+    }
   }
   //-ResourceEventStore
 
@@ -291,32 +246,12 @@ class MongoDBStore(
   }
 
   /**
-   * Find the very first activation event for a particular user.
-   *
-   */
-  def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent] = {
-    val query = new BasicDBObjectBuilder().
-      add(IMEventNames.userID, userID).
-      add(IMEventNames.isActive, true).get()
-
-    val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
-
-    withCloseable(cursor) { cursor ⇒
-      if(cursor.hasNext) {
-        Some(MongoDBIMEvent.fromDBObject(cursor.next()))
-      } else {
-        None
-      }
-   }
-  }
-
-  /**
    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
    * the given function `f`.
    *
    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
    */
-  def replayIMEventsInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
+  def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
     val query = new BasicDBObject(IMEventNames.userID, userID)
     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))