Towards the new billing and state computation algorithms billing
authorChristos KK Loverdos <loverdos@gmail.com>
Thu, 12 Jan 2012 13:59:52 +0000 (15:59 +0200)
committerChristos KK Loverdos <loverdos@gmail.com>
Thu, 12 Jan 2012 13:59:52 +0000 (15:59 +0200)
src/main/scala/gr/grnet/aquarium/logic/events/UserEvent.scala
src/main/scala/gr/grnet/aquarium/query/QueryResult.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala
src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala
src/main/scala/gr/grnet/aquarium/store/memory/MemUserStateStore.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala
src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala
src/main/scala/gr/grnet/aquarium/user/UserState.scala
src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala

index e753155..cc0c2c0 100644 (file)
@@ -46,7 +46,7 @@ case class  UserEvent(
    */
   def validate: Boolean = {
 
-    MasterConfigurator.userStateStore.findUserStateByUserId(userId) match {
+    MasterConfigurator.userStateStore.findLatestUserState(userId) match {
       case Just(x) =>
         if (eventType == 1){
           logger.warn("User to create exists: IMEvent".format(this.toJson));
diff --git a/src/main/scala/gr/grnet/aquarium/query/QueryResult.scala b/src/main/scala/gr/grnet/aquarium/query/QueryResult.scala
new file mode 100644 (file)
index 0000000..8674263
--- /dev/null
@@ -0,0 +1,18 @@
+package gr.grnet.aquarium.query
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+sealed abstract class QueryResult[+R]
+
+case class HaveOne[+R](result: R) extends QueryResult[R]
+case object HaveNoMore extends QueryResult[Nothing]
+case object RequestedNoMore extends QueryResult[Nothing]
+
+sealed abstract class ProcessResult[+R]
+
+case object FetchMore extends ProcessResult[Nothing]
+case object FetchNoMore extends ProcessResult[Nothing]
+case class ReturnResult[+R](result: R) extends ProcessResult[R]
index 5d3ea4c..609001e 100644 (file)
@@ -57,4 +57,9 @@ trait ResourceEventStore {
    * The events are returned in ascending timestamp order.
    */
   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent]
+
+  /**
+   * The timestamps are used inclusively
+   */
+  def findResourceEventsForPeriod(userId: String, startTime: Long, stopTime: Long): List[ResourceEvent]
 }
\ No newline at end of file
index a9cbe14..61f957f 100644 (file)
@@ -48,6 +48,6 @@ import com.ckkloverdos.maybe.Maybe
 
 trait UserStateStore {
   def storeUserState(userState: UserState): Maybe[RecordID]
-  def findUserStateByUserId(userId: String): Maybe[UserState]
+  def findLatestUserState(userId: String): Maybe[UserState]
   def deleteUserState(userId: String): Unit
 }
\ No newline at end of file
index 778b3d6..3ede106 100644 (file)
@@ -62,7 +62,7 @@ class MemUserStateStore extends UserStateStore with Configurable {
     Just(RecordID(userId))
   }
 
-  def findUserStateByUserId(userId: String) = {
+  def findLatestUserState(userId: String) = {
     userStateByUserId.get(userId) match {
       case null       ⇒ NoVal
       case userStateJ ⇒ userStateJ
index 18957f1..91540e6 100644 (file)
@@ -38,6 +38,7 @@ package gr.grnet.aquarium.store.mongodb
 import gr.grnet.aquarium.util.Loggable
 import com.mongodb.util.JSON
 import gr.grnet.aquarium.user.UserState
+import gr.grnet.aquarium.user.UserState.{JsonNames => UserStateJsonNames}
 import gr.grnet.aquarium.util.displayableObjectInfo
 import gr.grnet.aquarium.util.json.JsonSupport
 import collection.mutable.ListBuffer
@@ -46,8 +47,8 @@ import gr.grnet.aquarium.logic.events.ResourceEvent.{JsonNames => ResourceJsonNa
 import gr.grnet.aquarium.logic.events.WalletEntry.{JsonNames => WalletJsonNames}
 import java.util.Date
 import com.ckkloverdos.maybe.Maybe
-import com.mongodb._
 import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent, AquariumEvent}
+import com.mongodb._
 
 /**
  * Mongodb implementation of the various aquarium stores.
@@ -122,16 +123,28 @@ class MongoDBStore(
       cursor.close()
     }
   }
+
+  def findResourceEventsForPeriod(userId: String, startTime: Long, stopTime: Long): List[ResourceEvent] = {
+    val query = new BasicDBObject()
+    query.put(ResourceJsonNames.userId, userId)
+    query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$gte", startTime))
+    query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$lte", stopTime))
+
+    val orderBy = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
+
+    MongoDBStore.runQuery[ResourceEvent](query, rcEvents, orderBy)(MongoDBStore.dbObjectToResourceEvent)(None)
+  }
   //-ResourceEventStore
 
   //+UserStateStore
   def storeUserState(userState: UserState): Maybe[RecordID] =
     MongoDBStore.storeUserState(userState, userStates)
 
-  def findUserStateByUserId(userId: String): Maybe[UserState] = {
+  def findLatestUserState(userId: String): Maybe[UserState] = {
     Maybe {
-      val query = new BasicDBObject(ResourceJsonNames.userId, userId)
-      val cursor = userStates find query
+      val query = new BasicDBObject(UserStateJsonNames.userId, userId)
+      val orderBy = new BasicDBObject(UserStateJsonNames.snapshotMillis, -1)
+      val cursor = userStates.find(query).sort(orderBy)
 
       try {
         if(cursor.hasNext)
index 3bc4b47..d1574a2 100644 (file)
@@ -90,9 +90,34 @@ case class ResourceInstanceSnapshot(
     this.name == name &&
     this.instanceId == instanceId
 }
+
+/**
+ * A map from (resourceName, resourceInstanceId) to (value, snapshotTime).
+ * This representation is convenient for computations and updating, while the
+ * [[gr.grnet.aquarium.user.OwnedResourcesSnapshot]] representation is convenient for JSON serialization.
+ */
+class OwnedResourcesMap(map: Map[(String, String), (Float, Long)]) {
+  def toResourcesSnapshot(snapshotTime: Long): OwnedResourcesSnapshot =
+    OwnedResourcesSnapshot(
+      map map {
+        case ((name, instanceId), (value, snapshotTime)) ⇒
+          ResourceInstanceSnapshot(name, instanceId, value, snapshotTime
+      )} toList,
+      snapshotTime
+    )
+}
+
 case class OwnedResourcesSnapshot(data: List[ResourceInstanceSnapshot], snapshotTime: Long)
   extends UserDataSnapshot[List[ResourceInstanceSnapshot]] with JsonSupport {
 
+  def toResourcesMap: OwnedResourcesMap = {
+    val tuples = for {
+      rc <- data
+    } yield ((rc.name, rc.instanceId), (rc.value, rc.snapshotTime))
+
+    new OwnedResourcesMap(Map(tuples.toSeq: _*))
+  }
+
   def findResourceSnapshot(name: String, instanceId: String): Option[ResourceInstanceSnapshot] =
     data.find { x => name.equals(x.name) && instanceId.equals(x.instanceId) }
 
index 0ebb45b..b36dc6f 100644 (file)
@@ -57,6 +57,17 @@ import gr.grnet.aquarium.logic.accounting.Policy
 
 case class UserState(
     userId: String,
+
+    /**
+     * When the user was created in the system (not Aquarium). We use this as a basis for billing periods. Set to
+     * zero if unknown.
+     */
+    startDate: Long,
+
+    /**
+     * The `occurredMillis` of the last processed message.
+     */
+    lastProcessedOccurredMillis: Long,
     active: ActiveSuspendedSnapshot,
     credits: CreditSnapshot,
     agreement: AgreementSnapshot,
@@ -64,7 +75,8 @@ case class UserState(
     paymentOrders: PaymentOrdersSnapshot,
     ownedGroups: OwnedGroupsSnapshot,
     groupMemberships: GroupMembershipsSnapshot,
-    ownedResources: OwnedResourcesSnapshot
+    ownedResources: OwnedResourcesSnapshot,
+    snapshotMillis: Long = 0
 ) extends JsonSupport {
 
   private[this] def _allSnapshots: List[Long] = {
@@ -90,6 +102,9 @@ case class UserState(
        Failed(new Exception("No agreement snapshot found for user %s".format(userId)))
     }
   }
+
+  def resourcesMap = ownedResources.toResourcesMap
+
   
   def safeCredits = credits match {
     case c @ CreditSnapshot(date, millis) ⇒ c
@@ -114,4 +129,10 @@ object UserState {
   def fromXml(xml: String): UserState = {
     fromJValue(Xml.toJson(scala.xml.XML.loadString(xml)))
   }
+
+  object JsonNames {
+    final val _id = "_id"
+    final val userId = "userId"
+    final val snapshotMillis = "snapshotMillis"
+  }
 }
\ No newline at end of file
diff --git a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala
new file mode 100644 (file)
index 0000000..6c647de
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2011 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.user
+
+import gr.grnet.aquarium.store.ResourceEventStore
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+object UserStateComputations {
+  def createBlankState(userId: String, agreementName: String = "default") = {
+    val now = 0L
+    UserState(
+      userId,
+      now,
+      now,
+      ActiveSuspendedSnapshot(false, now),
+      CreditSnapshot(0, now),
+      AgreementSnapshot(agreementName, now),
+      RolesSnapshot(List(), now),
+      PaymentOrdersSnapshot(Nil, now),
+      OwnedGroupsSnapshot(Nil, now),
+      GroupMembershipsSnapshot(Nil, now),
+      OwnedResourcesSnapshot(List(), now)
+    )
+  }
+
+  def computeBillingState(initialUserState: UserState, startMillis: Long, stopMillis: Long,
+                          rcEventStore: ResourceEventStore): UserState = {
+
+    val userId = initialUserState.userId
+    val rcEvents = rcEventStore.findResourceEventsForPeriod(userId, startMillis, stopMillis)
+
+    rcEvents.foldLeft[UserState](initialUserState) {
+      case (userState, rcEvent) ⇒
+        userState
+    }
+  }
+}
\ No newline at end of file
index bb28536..743de4a 100644 (file)
@@ -74,31 +74,8 @@ class UserActor extends AquariumActor
     justProcessTheResourceEvent(resourceEvent, "ACTUAL")
   }
 
-
-  /**
-   * Given an "onoff" event, we try to locate all unprocessed resource events that precede this one.
-   */
-  def findOlderResourceEventsForOnOff(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
-    Nil
-  }
-
-  def findOlderResourceEventsForOther(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
-    Nil
-  }
-
   /**
-   * Find resource events that precede the given one and are unprocessed.
-   */
-  private[this] def findOlderResourceEvents(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
-    if(rcEvent.isOnOffEvent(policy)) {
-      findOlderResourceEventsForOnOff(rcEvent, policy)
-    } else {
-      findOlderResourceEventsForOther(rcEvent, policy)
-    }
-  }
-
-  /**
-   * Terminology:
+   * Terminology
    *
    * - Credits
    *   The analog of money. Credits are the `universal money` within Aquarium.
@@ -112,9 +89,9 @@ class UserActor extends AquariumActor
    *   An owner of resources and credits.
    *
    * - Resource event
-   *   An event that is generated by a system, which is responsible for the resource and describes a
-   *   state change for the resource. In particular, a resource event records the time when that state
-   *   change occurred (`occurredMillis` attribute) and the changed value (`value` attribute).
+   *   An event that is generated by a system, which is responsible for the resource.
+   *   The resource event describes a state change for the resource. In particular, a resource event records the time
+   *   when that state change occurred (`occurredMillis` attribute) and the changed value (`value` attribute).
    *
    * - Resource event store
    *   A datatabase, in the general sense, where resource events are stored.
@@ -139,96 +116,70 @@ class UserActor extends AquariumActor
    * - "Off" resource event
    *   An "on/off" event which actually records the "off" state for the particular resource it refers to.
    *
-   * - Wallet entry
-   *   The entity that describes an accounting event and its corresponding credit value.
-   *   For example, downloading files uses the download bandwidth and this costs credits. The exact
-   *   cost, which is determined using the corresponding cost policy of the event, is recorded
-   *   in one or more wallet entries.
-   *
-   * - Finalized wallet entry
-   *   It is a wallet entry whose recorded credits (its credit value) can be taken into account for
-   *   billing the respective user (the resource owner).
-   *   When we say that a wallet entry is in a finalized state, we mean that it is a finalized wallet entry, as
-   *   described above.
-   *
-   * - Pending or Non-finalized wallet entry
-   *   It is a wallet entry whose recorded credits (probably having a zero value) cannot be taken into
-   *   account for billing the respective resource owner. Pending wallet entries are introduced to handle
-   *   the "on/off" resource events.
-   *   When we say that a wallet entry is in/having a pending state, we mean that it is in a pending or
-   *   non-finalized wallet entry, as described above.
-   *
-   * - Wallet store
-   *   A database, in the general sense, where wallet entries are stored.
-   *
    * - User Bill
    *   A, usually, periodic statement of how many credits the user has consumed.
    *   It may contain detailed analysis that relates consumed credits to resources.
    *
-   * - Billable or Chargeable wallet entry
-   *   A wallet entry that can participate in the creation of the user Bill.
-   *   A wallet entry is billable if and only if it is finalized.
-   *
-   * - Processed resource event
-   *   A resource event which has generated wallet entries.
-   *   We may also designate a processed resource event as `partially processed` in order to distinguish it
-   *   from a `fully processed` resource event.
-   *
-   * - Fully processed resource event
-   *   A resource event which has no pending wallet entries or has no pending wallet entries with corresponding new
-   *   and finalized wallet entries (the meaning of the `corresponding` wallet entries will become
-   *   obvious in action A2 below).
-   *
-   *
-   * When an event is processed, there are three possible actions as an outcome, namely A1, A2, A3:
-   *
-   * - A1. Wallet entries are created. This happens *always*. Each resource event may create more than one
-   *       wallet entries. The credit value associated with each wallet entry may be zero credits but we
-   *       always generate wallet entries even those with zero credit values.
-   *
-   *       We say that the event that causes the generation of wallet entries is the `generator` event.
-   *
-   *       The semantics of wallet entry creation depend on whether the original event is an "on/off" event
-   *       or not.
-   *
-   *       - If the event is an "on/off" event, then
-   *         - If it is an "on" event, then one and only one new wallet entry is created having a pending state.
-   *           The credit value of this wallet entry is zero and its `occurredMillis` attribute is the same
-   *           as the `occurredMillis` attribute of the generator event.
-   *
-   *           So what we actually do in this case is to record the "on" state with a non-billable wallet entry.
-   *           ==> TODO: How is tis related to the (later) expecting "off" state and corresponding (to the "off"
-   *               TODO: state) wallet entries?
-   *
-   *         - If it is an "off" event, then we need to find the corresponding "on" event.
-   *           As long as we have the corresponding "on" event, then based on the relative timespan between
-   *           the two "on/off" events and the corresponding resource policy we compute a series of *new*
-   *           and *finalized* wallet entries. Each new wallet entry has an `occurredMillis` attribute equal to the
-   *           `occurredMillis` attribute of the original, pending, wallet entry. We then say that the new
-   *           wallet entries _correspond_ to the original, pending wallet entry.
-   *           ==> TODO: verify that the semantics of `occurredMillis` for the new wallet entries, as defined
-   *               TODO: above, are correct.
-   *
-   *           ==> TODO: What do we do with the old, pending wallet entry? Do we keep it or do we delete it?
-   *
-   *           The new wallet entries are generated according to Algorithm W2.
-   *
-   *       - If the event is not an "on/off" event then, by design, it can directly generate one or more
-   *         finalized wallet entries. These wallet entries are generated by Algorithm W1.
+   * - Billing period
+   *   A time period at the end of which we issue a user bill.
+   *   A billing period is made of a starting date and a duration that is a multiple of a week.
+   *   A usual billing period starts on a particular month date (eg. 3rd) and lasts for a month.
+   *   Each resource type designates what happens to its accumulated value (if any) at the beginning of the
+   *   billing period. Usually, at the beginning of the billing period, the accumulating amounts of resources are set
+   *   to zero. In the general case, there is a function that tells us exactly to which value to set the
+   *   accumulating amount. For example, for a monthly billing period, the total uploading bandwidth is reset to zero
+   *   every month.
+   *
+   * - User state
+   *   The user state is made of the following distinct parts (the first two can be integrated but not our concern
+   *   right now):
+   *   - User credit state, that is the total user credits
+   *   - User resource state, that is:
+   *     - For each resource type (owned by the user)
+   *       - For each resource instance (owned by the user)
+   *         - The particular accumulating value associated with the instance.
+   *           For example, regarding uploading bandwidth, the total uploading bandwidth in MB.
+   *   - Processing state, made of:
+   *     - The `occurredMillis` of the last processed resource event
+   *     - The `id` of the last processed resource event
+   *       ==> TODO: do we need this?
+   *
+   * - Last known user state
+   *   At periodic intervals, the current user state snapshot is saved. The most recent record of such user state is
+   *   the last known user state.
+   *
+   * - Resource event processing
+   *   The procedure by which a resource event leads to state changes of the user state.
+   *
+   * We process a resource event by taking the following actions:
+   *
+   * - A1. Compute the relevant changes in the user resource state. In particular:
+   *       - If the resource event is "on/off"
+   *         - If it is "on" then ignore it
+   *         - If it is "off" then
+   *           - Find the previous relevant "on" event
+   *           - Use the "on/off" pair to compute the respective changes in the user resource state
+   *       - If the resource is not "on/off"
+   *         - Use it to compute the respective changes in the user resource state.
+   * - A2. Compute the relevant changes in the user credit state.
+   *       - If the resource is "on/off"
+   *         - If this is an "on" event, ignore it
+   *            ==> TODO: could make "on" events part of the user state, so as to provisionally compute
+   *                TODO: credit diffs as if hypothetical "off" events arrive at the moment of
+   *                TODO: state diff calculation.
+   *         - If it is an "off" event
+   *           - Find the previous relevant "on" event
+   *           - Use the "on/off" pair to compute the respective changes in the user credit state
+   *       - If the resource is continuous...
+   *       - If the resource is discrete...
+   *
+   * For Step A1 above, care must be taken to accommodate for billing periods. For example,
+   * when a new billing period begins we must reset the accumulating values to their billing period initial value
+   * (usually zero).
    *
-   * - A2. As a byproduct of the above, the user credits total is updated.
-   *
-   *       For this, we just use the chargeable wallet entries created from action A1. Non-chargeable wallet
-   *       entries are ignored completely.
-   *
-   * - A3. Relevant resource state (some value) is updated. For example, the total bandwidth up value is
-   *       increased by the amount.
-   *
-   *       Since a resource event records the changed value for the resource, we can easily update the current
-   *       value for the resource.
    *
    * Under ideal circumstances, whenever a resource event arrives, we immediately process it and, so, the steps
-   * described in the above actions are what it only takes to get to the new state.
+   * described in the above actions are what it only takes to get to the new user state.
    *
    * But it may so happen that the above event processing procedure may be interrupted. For example, an Aquarium
    * component or external dependency may fail. In such cases, when a resource event arrives we cannot safely assume
@@ -239,72 +190,41 @@ class UserActor extends AquariumActor
    * - For each arriving resource event, it is possible that there exist events that arrived previously but which
    *   have not been partially or fully processed.
    *
-   * So, on arrival of a new event, we need to search our event and wallet stores to find those unprocessed events
-   * and process them in succession, as if they are newly arrived.
-   *
-   * In effect and in the most general case, we never process one event at a time but more than one resource
-   * events. Their processing order is the ascending order of their `occurredMillis` attribute.
-   * ==> TODO: Verify that this semantics of the processing order is correct.
-   *
-   *
-   * We will need the following algorithms:
-   *
-   * Algorithm W1: Given an non "On/Off" resource event, generate its wallet entries.
-   *
-   * Algorithm W2: Given an "Off" resource event and the corresponding "On" resource event, generate
-   *               the corresponding wallet entries.
-   *
-   * Algorithm OnOff: Given an "Off" resource event, find its corresponding "On" resource event.
-   *
-   * Algorithm F: Given a newly arrived resource event, find the exact list of all unprocessed events up to the new
-   *              one.
-   * Algorithm P: Process a resource event as if it is the most recent unprocessed one.
-   *
-   * The implementations are as follows:
-   *
-   * ============
-   * Algorithm W1
-   * ============
-   * - Input
-   *   - A non-"on/off" resource event
-   *
-   * - Output
-   *   - The respective wallet entries
-   *
-   * - Implementation
-   *   TODO: This is done in Accounting.chargeEvents
-   *
-   * ============
-   * Algorithm W2
-   * ============
-   * - Input
-   *   - An "off" resource event
-   *   - The corresponding "on" resource event
-   *
-   * - Output
-   *   - The respective wallet entries
-   *
-   * - Implementation
-   *   TODO: Trivial
-   *
-   * ===============
-   * Algorithm OnOff
-   * ===============
-   * - Input
-   *
-   * ===========
-   * Algorithm F
-   * ===========
-   * Input:  A resource event (e).
-   * Output: A list (l) of all unprocessed resource events up to (e).
-   *
-   *
+   * So, on arrival of a new event, we need to to find those unprocessed events and process them in succession,
+   * as if they are newly arrived. In effect and in the most general case, we never process one resource event at a
+   * time but more than one resource events.
+   * ==> TODO: What is their processing order? Normally, the total order (in the mathematical sense) imposed by
+   *     TODO: `occurredMillis`
+   *
+   * We distinguish event processing in two categories:
+   * - P1. Billing period event processing.
+   *   The goal is to compute the exact user state and relevant accounting info for this period.
+   *   Given a billing period (start and stop dates), we find ALL resource events that occurred withing that period
+   *   and compute:
+   *   - A series of accounting entries that associate resource events to credits
+   *   - The user state at the end of the billing period
+   *   The algorithm runs above actions A1 and A2 on each event.
+   *
+   * - P2. Realtime event processing.
+   *   This is the event processing that takes place in realtime, whenever a new resource event arrives.
+   *   The goal is to be able to have a near-accurate realtime user state. We say near-accurate instead of accurate,
+   *   because events may come out of order and, for complexity reasons, we want to avoid keeping the whole
+   *   realtime processing state history.
+   *
+   * P1 is rather straightforward, since the resource event DB can be queried to give us the events in
+   * `occurredMillis` total order. The we can feed these events to a calculation engine that takes actions A1 and A2
+   * directly.
+   *
+   * The implementation of P2 is a bit more involved. P2 is needed so as to be able to answer, in realtime,
+   * about the credit status for a User. As a first approximation, instead of P2,
+   * we run P1 each time we need to compute the realtime state within a `freshness` threshold.
    *
    */
   private[this] def thisIsJustForTheDoc: Unit = {
 
   }
 
+
   /**
    * Find and process older resource events.
    *
@@ -469,7 +389,7 @@ class UserActor extends AquariumActor
     val userId = event.userId
     DEBUG("Creating user from state %s", event)
     val usersDB = _configurator.storeProvider.userStateStore
-    usersDB.findUserStateByUserId(userId) match {
+    usersDB.findLatestUserState(userId) match {
       case Just(userState) ⇒
         WARN("User already created, state = %s".format(userState))
       case failed @ Failed(e, m) ⇒
@@ -484,6 +404,8 @@ class UserActor extends AquariumActor
         } else {
           this._userState = UserState(
             userId,
+            0,
+            0,
             ActiveSuspendedSnapshot(event.isStateActive, now),
             CreditSnapshot(0, now),
             AgreementSnapshot(agreementOpt.get.name, now),
@@ -528,9 +450,9 @@ class UserActor extends AquariumActor
   /**
    * Try to load from the DB the latest known info (snapshot data) for the given user.
    */
-  private[this] def findUserState(userId: String): Maybe[UserState] = {
+  private[this] def loadLatestUserState(userId: String): Maybe[UserState] = {
     val usersDB = _configurator.storeProvider.userStateStore
-    usersDB.findUserStateByUserId(userId)
+    usersDB.findLatestUserState(userId)
   }
 
   /**
@@ -541,7 +463,7 @@ class UserActor extends AquariumActor
    */
   private[this] def ensureUserState(): Unit = {
     if(null eq this._userState) {
-      findUserState(this._userId) match {
+      loadLatestUserState(this._userId) match {
         case Just(userState) ⇒
           DEBUG("Loaded user state %s from DB", userState)
           //TODO: May be out of sync with the event store, rebuild it here
@@ -557,6 +479,8 @@ class UserActor extends AquariumActor
     }
   }
 
+  private[this] def haveUserState = this._userState ne null
+
   /**
    * Replay the event log for all events that affect the user state, starting
    * from the provided time instant.
@@ -606,6 +530,8 @@ class UserActor extends AquariumActor
 
     this._userState = UserState(
       _userId,
+      0,
+      0,
       ActiveSuspendedSnapshot(false, now),
       CreditSnapshot(0, now),
       AgreementSnapshot(agreement.get.name, now),
@@ -720,6 +646,24 @@ class UserActor extends AquariumActor
         ERROR("Received %s but my userId = %s".format(m, this._userId))
         // TODO: throw an exception here
       } else {
+        if(!haveUserState) {
+          // 1. try to load user state from snapshots
+          loadLatestUserState(userId) match {
+            case Just(latestUserState) ⇒
+              DEBUG("Found latest user state %s".format(latestUserState))
+              // Need to check how stale this is
+              val a = latestUserState
+            case Failed(e, m) ⇒
+              ERROR("While loading user state from DB: [%s][%s] %s", e.getClass.getName, e.getMessage, m)
+            case NoVal ⇒
+              //TODO: Rebuild actor state here.
+              rebuildState(0)
+              WARN("Request for unknown (to Aquarium) user")
+          }
+        }
+        
+        
+        
         // This is the big party.
         // Get the user state, if it exists and make sure it is not stale.
         ensureUserState()