--- /dev/null
+/*
+ * Copyright 2011 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.user
+
+import gr.grnet.aquarium.store.ResourceEventStore
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+object UserStateComputations {
+ def createBlankState(userId: String, agreementName: String = "default") = {
+ val now = 0L
+ UserState(
+ userId,
+ now,
+ now,
+ ActiveSuspendedSnapshot(false, now),
+ CreditSnapshot(0, now),
+ AgreementSnapshot(agreementName, now),
+ RolesSnapshot(List(), now),
+ PaymentOrdersSnapshot(Nil, now),
+ OwnedGroupsSnapshot(Nil, now),
+ GroupMembershipsSnapshot(Nil, now),
+ OwnedResourcesSnapshot(List(), now)
+ )
+ }
+
+ def computeBillingState(initialUserState: UserState, startMillis: Long, stopMillis: Long,
+ rcEventStore: ResourceEventStore): UserState = {
+
+ val userId = initialUserState.userId
+ val rcEvents = rcEventStore.findResourceEventsForPeriod(userId, startMillis, stopMillis)
+
+ rcEvents.foldLeft[UserState](initialUserState) {
+ case (userState, rcEvent) ⇒
+ userState
+ }
+ }
+}
\ No newline at end of file
justProcessTheResourceEvent(resourceEvent, "ACTUAL")
}
-
- /**
- * Given an "onoff" event, we try to locate all unprocessed resource events that precede this one.
- */
- def findOlderResourceEventsForOnOff(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
- Nil
- }
-
- def findOlderResourceEventsForOther(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
- Nil
- }
-
/**
- * Find resource events that precede the given one and are unprocessed.
- */
- private[this] def findOlderResourceEvents(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
- if(rcEvent.isOnOffEvent(policy)) {
- findOlderResourceEventsForOnOff(rcEvent, policy)
- } else {
- findOlderResourceEventsForOther(rcEvent, policy)
- }
- }
-
- /**
- * Terminology:
+ * Terminology
*
* - Credits
* The analog of money. Credits are the `universal money` within Aquarium.
* An owner of resources and credits.
*
* - Resource event
- * An event that is generated by a system, which is responsible for the resource and describes a
- * state change for the resource. In particular, a resource event records the time when that state
- * change occurred (`occurredMillis` attribute) and the changed value (`value` attribute).
+ * An event that is generated by a system, which is responsible for the resource.
+ * The resource event describes a state change for the resource. In particular, a resource event records the time
+ * when that state change occurred (`occurredMillis` attribute) and the changed value (`value` attribute).
*
* - Resource event store
* A datatabase, in the general sense, where resource events are stored.
* - "Off" resource event
* An "on/off" event which actually records the "off" state for the particular resource it refers to.
*
- * - Wallet entry
- * The entity that describes an accounting event and its corresponding credit value.
- * For example, downloading files uses the download bandwidth and this costs credits. The exact
- * cost, which is determined using the corresponding cost policy of the event, is recorded
- * in one or more wallet entries.
- *
- * - Finalized wallet entry
- * It is a wallet entry whose recorded credits (its credit value) can be taken into account for
- * billing the respective user (the resource owner).
- * When we say that a wallet entry is in a finalized state, we mean that it is a finalized wallet entry, as
- * described above.
- *
- * - Pending or Non-finalized wallet entry
- * It is a wallet entry whose recorded credits (probably having a zero value) cannot be taken into
- * account for billing the respective resource owner. Pending wallet entries are introduced to handle
- * the "on/off" resource events.
- * When we say that a wallet entry is in/having a pending state, we mean that it is in a pending or
- * non-finalized wallet entry, as described above.
- *
- * - Wallet store
- * A database, in the general sense, where wallet entries are stored.
- *
* - User Bill
* A, usually, periodic statement of how many credits the user has consumed.
* It may contain detailed analysis that relates consumed credits to resources.
*
- * - Billable or Chargeable wallet entry
- * A wallet entry that can participate in the creation of the user Bill.
- * A wallet entry is billable if and only if it is finalized.
- *
- * - Processed resource event
- * A resource event which has generated wallet entries.
- * We may also designate a processed resource event as `partially processed` in order to distinguish it
- * from a `fully processed` resource event.
- *
- * - Fully processed resource event
- * A resource event which has no pending wallet entries or has no pending wallet entries with corresponding new
- * and finalized wallet entries (the meaning of the `corresponding` wallet entries will become
- * obvious in action A2 below).
- *
- *
- * When an event is processed, there are three possible actions as an outcome, namely A1, A2, A3:
- *
- * - A1. Wallet entries are created. This happens *always*. Each resource event may create more than one
- * wallet entries. The credit value associated with each wallet entry may be zero credits but we
- * always generate wallet entries even those with zero credit values.
- *
- * We say that the event that causes the generation of wallet entries is the `generator` event.
- *
- * The semantics of wallet entry creation depend on whether the original event is an "on/off" event
- * or not.
- *
- * - If the event is an "on/off" event, then
- * - If it is an "on" event, then one and only one new wallet entry is created having a pending state.
- * The credit value of this wallet entry is zero and its `occurredMillis` attribute is the same
- * as the `occurredMillis` attribute of the generator event.
- *
- * So what we actually do in this case is to record the "on" state with a non-billable wallet entry.
- * ==> TODO: How is tis related to the (later) expecting "off" state and corresponding (to the "off"
- * TODO: state) wallet entries?
- *
- * - If it is an "off" event, then we need to find the corresponding "on" event.
- * As long as we have the corresponding "on" event, then based on the relative timespan between
- * the two "on/off" events and the corresponding resource policy we compute a series of *new*
- * and *finalized* wallet entries. Each new wallet entry has an `occurredMillis` attribute equal to the
- * `occurredMillis` attribute of the original, pending, wallet entry. We then say that the new
- * wallet entries _correspond_ to the original, pending wallet entry.
- * ==> TODO: verify that the semantics of `occurredMillis` for the new wallet entries, as defined
- * TODO: above, are correct.
- *
- * ==> TODO: What do we do with the old, pending wallet entry? Do we keep it or do we delete it?
- *
- * The new wallet entries are generated according to Algorithm W2.
- *
- * - If the event is not an "on/off" event then, by design, it can directly generate one or more
- * finalized wallet entries. These wallet entries are generated by Algorithm W1.
+ * - Billing period
+ * A time period at the end of which we issue a user bill.
+ * A billing period is made of a starting date and a duration that is a multiple of a week.
+ * A usual billing period starts on a particular month date (eg. 3rd) and lasts for a month.
+ * Each resource type designates what happens to its accumulated value (if any) at the beginning of the
+ * billing period. Usually, at the beginning of the billing period, the accumulating amounts of resources are set
+ * to zero. In the general case, there is a function that tells us exactly to which value to set the
+ * accumulating amount. For example, for a monthly billing period, the total uploading bandwidth is reset to zero
+ * every month.
+ *
+ * - User state
+ * The user state is made of the following distinct parts (the first two can be integrated but not our concern
+ * right now):
+ * - User credit state, that is the total user credits
+ * - User resource state, that is:
+ * - For each resource type (owned by the user)
+ * - For each resource instance (owned by the user)
+ * - The particular accumulating value associated with the instance.
+ * For example, regarding uploading bandwidth, the total uploading bandwidth in MB.
+ * - Processing state, made of:
+ * - The `occurredMillis` of the last processed resource event
+ * - The `id` of the last processed resource event
+ * ==> TODO: do we need this?
+ *
+ * - Last known user state
+ * At periodic intervals, the current user state snapshot is saved. The most recent record of such user state is
+ * the last known user state.
+ *
+ * - Resource event processing
+ * The procedure by which a resource event leads to state changes of the user state.
+ *
+ * We process a resource event by taking the following actions:
+ *
+ * - A1. Compute the relevant changes in the user resource state. In particular:
+ * - If the resource event is "on/off"
+ * - If it is "on" then ignore it
+ * - If it is "off" then
+ * - Find the previous relevant "on" event
+ * - Use the "on/off" pair to compute the respective changes in the user resource state
+ * - If the resource is not "on/off"
+ * - Use it to compute the respective changes in the user resource state.
+ * - A2. Compute the relevant changes in the user credit state.
+ * - If the resource is "on/off"
+ * - If this is an "on" event, ignore it
+ * ==> TODO: could make "on" events part of the user state, so as to provisionally compute
+ * TODO: credit diffs as if hypothetical "off" events arrive at the moment of
+ * TODO: state diff calculation.
+ * - If it is an "off" event
+ * - Find the previous relevant "on" event
+ * - Use the "on/off" pair to compute the respective changes in the user credit state
+ * - If the resource is continuous...
+ * - If the resource is discrete...
+ *
+ * For Step A1 above, care must be taken to accommodate for billing periods. For example,
+ * when a new billing period begins we must reset the accumulating values to their billing period initial value
+ * (usually zero).
*
- * - A2. As a byproduct of the above, the user credits total is updated.
- *
- * For this, we just use the chargeable wallet entries created from action A1. Non-chargeable wallet
- * entries are ignored completely.
- *
- * - A3. Relevant resource state (some value) is updated. For example, the total bandwidth up value is
- * increased by the amount.
- *
- * Since a resource event records the changed value for the resource, we can easily update the current
- * value for the resource.
*
* Under ideal circumstances, whenever a resource event arrives, we immediately process it and, so, the steps
- * described in the above actions are what it only takes to get to the new state.
+ * described in the above actions are what it only takes to get to the new user state.
*
* But it may so happen that the above event processing procedure may be interrupted. For example, an Aquarium
* component or external dependency may fail. In such cases, when a resource event arrives we cannot safely assume
* - For each arriving resource event, it is possible that there exist events that arrived previously but which
* have not been partially or fully processed.
*
- * So, on arrival of a new event, we need to search our event and wallet stores to find those unprocessed events
- * and process them in succession, as if they are newly arrived.
- *
- * In effect and in the most general case, we never process one event at a time but more than one resource
- * events. Their processing order is the ascending order of their `occurredMillis` attribute.
- * ==> TODO: Verify that this semantics of the processing order is correct.
- *
- *
- * We will need the following algorithms:
- *
- * Algorithm W1: Given an non "On/Off" resource event, generate its wallet entries.
- *
- * Algorithm W2: Given an "Off" resource event and the corresponding "On" resource event, generate
- * the corresponding wallet entries.
- *
- * Algorithm OnOff: Given an "Off" resource event, find its corresponding "On" resource event.
- *
- * Algorithm F: Given a newly arrived resource event, find the exact list of all unprocessed events up to the new
- * one.
- * Algorithm P: Process a resource event as if it is the most recent unprocessed one.
- *
- * The implementations are as follows:
- *
- * ============
- * Algorithm W1
- * ============
- * - Input
- * - A non-"on/off" resource event
- *
- * - Output
- * - The respective wallet entries
- *
- * - Implementation
- * TODO: This is done in Accounting.chargeEvents
- *
- * ============
- * Algorithm W2
- * ============
- * - Input
- * - An "off" resource event
- * - The corresponding "on" resource event
- *
- * - Output
- * - The respective wallet entries
- *
- * - Implementation
- * TODO: Trivial
- *
- * ===============
- * Algorithm OnOff
- * ===============
- * - Input
- *
- * ===========
- * Algorithm F
- * ===========
- * Input: A resource event (e).
- * Output: A list (l) of all unprocessed resource events up to (e).
- *
- *
+ * So, on arrival of a new event, we need to to find those unprocessed events and process them in succession,
+ * as if they are newly arrived. In effect and in the most general case, we never process one resource event at a
+ * time but more than one resource events.
+ * ==> TODO: What is their processing order? Normally, the total order (in the mathematical sense) imposed by
+ * TODO: `occurredMillis`
+ *
+ * We distinguish event processing in two categories:
+ * - P1. Billing period event processing.
+ * The goal is to compute the exact user state and relevant accounting info for this period.
+ * Given a billing period (start and stop dates), we find ALL resource events that occurred withing that period
+ * and compute:
+ * - A series of accounting entries that associate resource events to credits
+ * - The user state at the end of the billing period
+ * The algorithm runs above actions A1 and A2 on each event.
+ *
+ * - P2. Realtime event processing.
+ * This is the event processing that takes place in realtime, whenever a new resource event arrives.
+ * The goal is to be able to have a near-accurate realtime user state. We say near-accurate instead of accurate,
+ * because events may come out of order and, for complexity reasons, we want to avoid keeping the whole
+ * realtime processing state history.
+ *
+ * P1 is rather straightforward, since the resource event DB can be queried to give us the events in
+ * `occurredMillis` total order. The we can feed these events to a calculation engine that takes actions A1 and A2
+ * directly.
+ *
+ * The implementation of P2 is a bit more involved. P2 is needed so as to be able to answer, in realtime,
+ * about the credit status for a User. As a first approximation, instead of P2,
+ * we run P1 each time we need to compute the realtime state within a `freshness` threshold.
*
*/
private[this] def thisIsJustForTheDoc: Unit = {
}
+
/**
* Find and process older resource events.
*
val userId = event.userId
DEBUG("Creating user from state %s", event)
val usersDB = _configurator.storeProvider.userStateStore
- usersDB.findUserStateByUserId(userId) match {
+ usersDB.findLatestUserState(userId) match {
case Just(userState) ⇒
WARN("User already created, state = %s".format(userState))
case failed @ Failed(e, m) ⇒
} else {
this._userState = UserState(
userId,
+ 0,
+ 0,
ActiveSuspendedSnapshot(event.isStateActive, now),
CreditSnapshot(0, now),
AgreementSnapshot(agreementOpt.get.name, now),
/**
* Try to load from the DB the latest known info (snapshot data) for the given user.
*/
- private[this] def findUserState(userId: String): Maybe[UserState] = {
+ private[this] def loadLatestUserState(userId: String): Maybe[UserState] = {
val usersDB = _configurator.storeProvider.userStateStore
- usersDB.findUserStateByUserId(userId)
+ usersDB.findLatestUserState(userId)
}
/**
*/
private[this] def ensureUserState(): Unit = {
if(null eq this._userState) {
- findUserState(this._userId) match {
+ loadLatestUserState(this._userId) match {
case Just(userState) ⇒
DEBUG("Loaded user state %s from DB", userState)
//TODO: May be out of sync with the event store, rebuild it here
}
}
+ private[this] def haveUserState = this._userState ne null
+
/**
* Replay the event log for all events that affect the user state, starting
* from the provided time instant.
this._userState = UserState(
_userId,
+ 0,
+ 0,
ActiveSuspendedSnapshot(false, now),
CreditSnapshot(0, now),
AgreementSnapshot(agreement.get.name, now),
ERROR("Received %s but my userId = %s".format(m, this._userId))
// TODO: throw an exception here
} else {
+ if(!haveUserState) {
+ // 1. try to load user state from snapshots
+ loadLatestUserState(userId) match {
+ case Just(latestUserState) ⇒
+ DEBUG("Found latest user state %s".format(latestUserState))
+ // Need to check how stale this is
+ val a = latestUserState
+ case Failed(e, m) ⇒
+ ERROR("While loading user state from DB: [%s][%s] %s", e.getClass.getName, e.getMessage, m)
+ case NoVal ⇒
+ //TODO: Rebuild actor state here.
+ rebuildState(0)
+ WARN("Request for unknown (to Aquarium) user")
+ }
+ }
+
+
+
// This is the big party.
// Get the user state, if it exists and make sure it is not stale.
ensureUserState()