# Override the user store (if present, it will not be given by the store provider above)
#user.state.store.class=gr.grnet.aquarium.store.memory.MemStorede the event store (if present, it will not be given by the store provider above)
#resource.event.store.class=
-# Override the WalletEntry store (if present, it will not be given by the store provider above)
-#wallet.entry.store.class=
# Override the user event store (if present, it will not be given by the store provider above)
#user.event.store.class=
# Override the user event store (if present, it will not be given by the store provider above)
import java.util.concurrent.atomic.AtomicBoolean
import gr.grnet.aquarium.ResourceLocator._
import com.ckkloverdos.sys.SysProp
+import gr.grnet.aquarium.computation.UserStateComputations
+import gr.grnet.aquarium.logic.accounting.algorithm.{SimpleCostPolicyAlgorithmCompiler, CostPolicyAlgorithmCompiler}
/**
* This is the Aquarium entry point.
}
+ private[this] lazy val _algorithmCompiler: CostPolicyAlgorithmCompiler = SimpleCostPolicyAlgorithmCompiler
+
+ // FIXME: () ⇒ this ?
+ private[this] lazy val _userStateComputations = new UserStateComputations(() ⇒ this)
+
private[this] lazy val _actorProvider = newInstance[RoleableActorProviderService](props(Keys.actor_provider_class))
/**
}
}
- private[this] lazy val _WalletEventStoreM: Maybe[WalletEntryStore] = {
- // If there is a specific `IMStore` implementation specified in the
- // properties, then this implementation overrides the event store given by
- // `IMProvider`.
- props.get(Keys.wallet_entry_store_class) map {
- className ⇒
- val instance = newInstance[WalletEntryStore](className)
- logger.info("Overriding WalletEntryStore provisioning. Implementation given by: %s".format(instance.getClass))
- instance
- }
- }
-
private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
props.get(Keys.policy_store_class) map {
className ⇒
stopServices()
}
+ def algorithmCompiler = _algorithmCompiler
+
+ def userStateComputations = _userStateComputations
+
def converters = _converters
def actorProvider = _actorProvider
}
}
- def walletStore = {
- _WalletEventStoreM match {
- case Just(es) ⇒ es
- case _ ⇒ storeProvider.walletEntryStore
- }
- }
-
def imEventStore = {
_imEventStoreM match {
case Just(es) ⇒ es
/**
* The class that implements the wallet entries store
*/
- final val wallet_entry_store_class = "wallet.entry.store.class"
-
- /**
- * The class that implements the wallet entries store
- */
final val policy_store_class = "policy.store.class"
import akka.config.Supervision.Temporary
import gr.grnet.aquarium.Aquarium
+import gr.grnet.aquarium.util.{shortClassNameOf, shortNameOfClass}
import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
-import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest}
import gr.grnet.aquarium.computation.data.IMStateSnapshot
import gr.grnet.aquarium.event.model.im.IMEventModel
import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
+import gr.grnet.aquarium.computation.NewUserState
+import gr.grnet.aquarium.actor.message.{GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
/**
*
*/
class UserActor extends ReflectiveRoleableActor {
+ private[this] var _userID: String = "<?>"
private[this] var _imState: IMStateSnapshot = _
// private[this] var _userState: UserState = _
-// private[this] var _newUserState: NewUserState = _
+ private[this] var _newUserState: NewUserState = _
self.lifeCycle = Temporary
-// private[this] def _userID = this._newUserState.userID
private[this] def _shutmedown(): Unit = {
-// if(_haveUserState) {
-// UserActorCache.invalidate(_userID)
-// }
+ if(_haveUserState) {
+ UserActorCache.invalidate(_userID)
+ }
self.stop()
}
override protected def onThrowable(t: Throwable, message: AnyRef) = {
logChainOfCauses(t)
-// ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
+ ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
_shutmedown()
}
aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000)
-// private[this] def _haveUserState = {
-// this._newUserState ne null
-// }
+ private[this] def _haveUserState = {
+ this._newUserState ne null
+ }
private[this] def _haveIMState = {
this._imState ne null
this._imState = newState
}
- logger.debug("Recomputed %s".format(this._imState))
+ DEBUG("Recomputed %s = %s", shortNameOfClass(classOf[IMStateSnapshot]), this._imState)
+ }
+
+ private[this] def createUserState(userID: String): Unit = {
}
def onInitializeUserState(event: InitializeUserState): Unit = {
- logger.debug("Got %s".format(event))
- createIMState(event.userID)
+ val userID = event.userID
+ this._userID = userID
+ DEBUG("Got %s", event)
+
+ createIMState(userID)
+ createUserState(userID)
}
private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = {
*/
def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
val imEvent = processEvent.imEvent
- val hadIMState = _haveIMState
-
- if(hadIMState) {
- if(this._imState.latestIMEvent.id == imEvent.id) {
- // This happens when the actor is brought to life, then immediately initialized, and then
- // sent the first IM event. But from the initialization procedure, this IM event will have
- // already been loaded from DB!
- logger.debug("Ignoring first %s after birth".format(imEvent.toDebugString))
- return
- }
- this._imState = this._imState.copyWithEvent(imEvent)
- } else {
- this._imState = IMStateSnapshot.initial(imEvent)
+ if(!_haveIMState) {
+ // This is an error. Should have been initialized from somewhere ...
+ throw new Exception("Got %s while being uninitialized".format(processEvent))
}
-// DEBUG("%s %s = %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState), this._imState)
+ if(this._imState.latestIMEvent.id == imEvent.id) {
+ // This happens when the actor is brought to life, then immediately initialized, and then
+ // sent the first IM event. But from the initialization procedure, this IM event will have
+ // already been loaded from DB!
+ DEBUG("Ignoring first %s after birth", imEvent.toDebugString)
+ return
+ }
+
+ this._imState = this._imState.copyWithEvent(imEvent)
+
+ DEBUG("Update %s = %s", shortClassNameOf(this._imState), this._imState)
+ }
+
+ def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
+ val rcEvent = event.rcEvent
+
+ if(!_haveIMState) {
+ // This means the user has not been activated. So, we do not process any resource event
+ INFO("Not processing %s", rcEvent.toJsonString)
+ return
+ }
}
+
def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
val userId = event.userID
// FIXME: Implement
// self reply GetUserStateResponse(userId, Right(this._userState))
}
- def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
- val rcEvent = event.rcEvent
-
- logger.info("Got\n{}", rcEvent.toJsonString)
+ private[this] def D_userID = {
+ this._userID
}
+ private[this] def DEBUG(fmt: String, args: Any*) =
+ logger.debug("User[%s]: %s".format(D_userID, fmt.format(args: _*)))
+
+ private[this] def INFO(fmt: String, args: Any*) =
+ logger.info("User[%s]: %s".format(D_userID, fmt.format(args: _*)))
+
+ private[this] def WARN(fmt: String, args: Any*) =
+ logger.warn("User[%s]: %s".format(D_userID, fmt.format(args: _*)))
+
+ private[this] def ERROR(fmt: String, args: Any*) =
+ logger.error("User[%s]: %s".format(D_userID, fmt.format(args: _*)))
-// private[this] def D_userID = {
-// if(this._newUserState eq null)
-// if(this._imState eq null)
-// "<NOT INITIALIZED>"
-// else
-// this._imState.latestIMEvent.userID
-// else
-// this._newUserState.userID
-// }
-//
-// private[this] def DEBUG(fmt: String, args: Any*) =
-// logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-//
-// private[this] def INFO(fmt: String, args: Any*) =
-// logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-//
-// private[this] def WARN(fmt: String, args: Any*) =
-// logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-//
-// private[this] def ERROR(fmt: String, args: Any*) =
-// logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-//
-// private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
-// logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
+ private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
+ logger.error("User[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
}
stateReferenceMillis: Long, // The time this state refers to
totalCredits: Double,
roleHistory: RoleHistory,
- agreementHistory: AgreementHistory
+ agreementHistory: AgreementHistory,
+ latestResourceEventID: String
)
object NewUserState {
userCreationMillis,
credits,
RoleHistory.initial(role, userCreationMillis),
- AgreementHistory.initial(agreement, userCreationMillis)
+ AgreementHistory.initial(agreement, userCreationMillis),
+ ""
)
}
}
\ No newline at end of file
import org.bson.types.ObjectId
-import gr.grnet.aquarium.AquariumInternalError
import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
-import gr.grnet.aquarium.event.model.{WalletEntry, NewWalletEntry}
+import gr.grnet.aquarium.event.model.NewWalletEntry
import gr.grnet.aquarium.util.json.JsonSupport
import gr.grnet.aquarium.logic.accounting.dsl.DSLAgreement
-import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, UserStateChangeReason, InitialUserStateSetup, IMEventArrival}
-import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
-import gr.grnet.aquarium.computation.data.{RoleHistory, AgreementHistoryItem, ResourceInstanceSnapshot, OwnedResourcesSnapshot, AgreementHistory, CreditSnapshot, LatestResourceEventsSnapshot, ImplicitlyIssuedResourceEventsSnapshot, IMStateSnapshot}
+import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, UserStateChangeReason, InitialUserStateSetup}
+import gr.grnet.aquarium.computation.data.{RoleHistory, ResourceInstanceSnapshot, OwnedResourcesSnapshot, AgreementHistory, LatestResourceEventsSnapshot, ImplicitlyIssuedResourceEventsSnapshot}
/**
* A comprehensive representation of the User's state.
* @param isFullBillingMonthState
* @param theFullBillingMonth
* @param implicitlyIssuedSnapshot
- * @param billingMonthWalletEntries
- * @param outOfSyncWalletEntries
* @param latestResourceEventsSnapshot
- * @param billingPeriodResourceEventsCounter
* @param billingPeriodOutOfSyncResourceEventsCounter
- * @param creditsSnapshot
- * @param agreementsSnapshot
+ * @param agreementHistory
* @param ownedResourcesSnapshot
* @param newWalletEntries
* The wallet entries computed. Not all user states need to holds wallet entries,
* only those that refer to billing periods (end of billing period).
* @param lastChangeReason
* The [[gr.grnet.aquarium.computation.reason.UserStateChangeReason]] for which the usr state has changed.
- * @param totalEventsProcessedCounter
* @param parentUserStateId
* The `ID` of the parent state. The parent state is the one used as a reference point in order to calculate
* this user state.
implicitlyIssuedSnapshot: ImplicitlyIssuedResourceEventsSnapshot,
/**
- * So far computed wallet entries for the current billing month.
- */
- billingMonthWalletEntries: List[WalletEntry],
-
- /**
- * Wallet entries that were computed for out of sync events.
- * (for the current billing month ??)
- */
- outOfSyncWalletEntries: List[WalletEntry],
-
- /**
* The latest (previous) resource events per resource instance.
*/
latestResourceEventsSnapshot: LatestResourceEventsSnapshot,
/**
- * Counts the total number of resource events used to produce this user state for
- * the billing period recorded by `billingPeriodSnapshot`
- */
- billingPeriodResourceEventsCounter: Long,
-
- /**
* The out of sync events used to produce this user state for
* the billing period recorded by `billingPeriodSnapshot`
*/
billingPeriodOutOfSyncResourceEventsCounter: Long,
- imStateSnapshot: IMStateSnapshot,
- creditsSnapshot: CreditSnapshot,
- agreementsSnapshot: AgreementHistory,
+
+ totalCredits: Double,
+
+ roleHistory: RoleHistory,
+
+ agreementHistory: AgreementHistory,
+
ownedResourcesSnapshot: OwnedResourcesSnapshot,
+
newWalletEntries: List[NewWalletEntry],
occurredMillis: Long, // The time fro which this state is relevant
+
// The last known change reason for this userState
lastChangeReason: UserStateChangeReason = NoSpecificChangeReason,
- totalEventsProcessedCounter: Long = 0L,
// The user state we used to compute this one. Normally the (cached)
// state at the beginning of the billing period.
parentUserStateId: Option[String] = None,
- _id: ObjectId = new ObjectId()
+ _id: String = new ObjectId().toString
) extends JsonSupport {
def idOpt: Option[String] = _id match {
// def userCreationFormatedDate = new MutableDateCalc(userCreationMillis).toString
def findDSLAgreementForTime(at: Long): Option[DSLAgreement] = {
- agreementsSnapshot.findForTime(at)
+ agreementHistory.findForTime(at)
}
def findResourceInstanceSnapshot(resource: String, instanceId: String): Option[ResourceInstanceSnapshot] = {
final val userID = "userID"
}
- def createInitialUserState(imEvent: IMEventModel, credits: Double, agreementName: String) = {
- if(!imEvent.isCreateUser) {
- throw new AquariumInternalError(
- "Got '%s' instead of '%s'".format(imEvent.eventType, IMEventModel.EventTypeNames.create))
- }
-
- val userID = imEvent.userID
- val userCreationMillis = imEvent.occurredMillis
-
- UserState(
- true,
- userID,
- userCreationMillis,
- 0L,
- false,
- null,
- ImplicitlyIssuedResourceEventsSnapshot(List()),
- Nil,
- Nil,
- LatestResourceEventsSnapshot(List()),
- 0L,
- 0L,
- IMStateSnapshot(true, imEvent, RoleHistory.Empty),
- CreditSnapshot(credits),
- AgreementHistory(List(AgreementHistoryItem(agreementName, userCreationMillis))),
- OwnedResourcesSnapshot(Nil),
- Nil,
- userCreationMillis,
- InitialUserStateSetup
- )
- }
-
def createInitialUserState(userID: String,
userCreationMillis: Long,
- isActive: Boolean,
- credits: Double,
- roleNames: List[String] = List(),
- agreementName: String = DSLAgreement.DefaultAgreementName) = {
- val now = userCreationMillis
-
+ totalCredits: Double,
+ initialRole: String,
+ initialAgreement: String) = {
UserState(
true,
userID,
0L,
false,
null,
- ImplicitlyIssuedResourceEventsSnapshot(List()),
- Nil,
- Nil,
- LatestResourceEventsSnapshot(List()),
- 0L,
+ ImplicitlyIssuedResourceEventsSnapshot.Empty,
+ LatestResourceEventsSnapshot.Empty,
0L,
- IMStateSnapshot(
- true,
- StdIMEvent(
- "",
- now, now, userID,
- "",
- isActive, roleNames.headOption.getOrElse("default"),
- "1.0",
- IMEventModel.EventTypeNames.create, Map()),
- RoleHistory.Empty
- ),
- CreditSnapshot(credits),
- AgreementHistory(List(AgreementHistoryItem(agreementName, userCreationMillis))),
- OwnedResourcesSnapshot(Nil),
+ totalCredits,
+ RoleHistory.initial(initialRole, userCreationMillis),
+ AgreementHistory.initial(initialAgreement, userCreationMillis),
+ OwnedResourcesSnapshot.Empty,
Nil,
- now,
+ userCreationMillis,
InitialUserStateSetup
)
}
def createInitialUserStateFrom(us: UserState): UserState = {
createInitialUserState(
- us.imStateSnapshot.latestIMEvent,
- us.creditsSnapshot.creditAmount,
- us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last)
+ us.userID,
+ us.userCreationMillis,
+ us.totalCredits,
+ us.roleHistory.firstRoleName.getOrElse("default"), // FIXME What is the default?
+ us.agreementHistory.firstAgreementName.getOrElse("default") // FIXME What is the default?
+ )
}
}
package gr.grnet.aquarium.computation
import scala.collection.mutable
-import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
import gr.grnet.aquarium.logic.accounting.Accounting
-import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
-import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
import gr.grnet.aquarium.computation.data._
import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, UserStateChangeReason}
import gr.grnet.aquarium.event.model.NewWalletEntry
import gr.grnet.aquarium.event.model.resource.ResourceEventModel
+import gr.grnet.aquarium.{Aquarium, AquariumInternalError, AquariumException}
/**
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-class UserStateComputations extends Loggable {
+class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
+
+ protected lazy val aquarium = _aquarium()
+ protected lazy val storeProvider = aquarium.storeProvider
+ protected lazy val accounting = new Accounting {}
+ protected lazy val algorithmCompiler = aquarium.algorithmCompiler
+ protected lazy val policyStore = storeProvider.policyStore
+ protected lazy val userStateStore = storeProvider.userStateStore
+ protected lazy val resourceEventStore = storeProvider.resourceEventStore
def findUserStateAtEndOfBillingMonth(userId: String,
billingMonthInfo: BillingMonthInfo,
- storeProvider: StoreProvider,
currentUserState: UserState,
defaultResourcesMap: DSLResourcesMap,
- accounting: Accounting,
- algorithmCompiler: CostPolicyAlgorithmCompiler,
calculationReason: UserStateChangeReason,
clogOpt: Option[ContextualLogger] = None): UserState = {
doFullMonthlyBilling(
userId,
billingMonthInfo,
- storeProvider,
currentUserState,
defaultResourcesMap,
- accounting,
- algorithmCompiler,
calculationReason,
Some(clog))
}
- val userStateStore = storeProvider.userStateStore
- val resourceEventStore = storeProvider.resourceEventStore
-
val userCreationMillis = currentUserState.userCreationMillis
val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
val billingMonthStartMillis = billingMonthInfo.startMillis
def processResourceEvent(startingUserState: UserState,
userStateWorker: UserStateWorker,
currentResourceEvent: ResourceEventModel,
- policyStore: PolicyStore,
stateChangeReason: UserStateChangeReason,
billingMonthInfo: BillingMonthInfo,
walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
- algorithmCompiler: CostPolicyAlgorithmCompiler,
clogOpt: Option[ContextualLogger] = None): UserState = {
val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
val theInstanceId = currentResourceEvent.safeInstanceId
val theValue = currentResourceEvent.value
- val accounting = userStateWorker.accounting
val resourcesMap = userStateWorker.resourcesMap
val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
} else {
val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
- val oldCredits = _workingUserState.creditsSnapshot.creditAmount
+ val oldCredits = _workingUserState.totalCredits
// A. Compute new resource instance accumulating amount
val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
// B. Compute new wallet entries
- clog.debug("agreementsSnapshot = %s", _workingUserState.agreementsSnapshot)
- val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
+ clog.debug("agreementsSnapshot = %s", _workingUserState.agreementHistory)
+ val alltimeAgreements = _workingUserState.agreementHistory.agreementNamesByTimeslot
// clog.debug("Computing full chargeslots")
val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
}
_workingUserState = _workingUserState.copy(
- creditsSnapshot = CreditSnapshot(newCredits),
- stateChangeCounter = _workingUserState.stateChangeCounter + 1,
- totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
+ totalCredits = newCredits,
+ stateChangeCounter = _workingUserState.stateChangeCounter + 1
)
}
}
def processResourceEvents(resourceEvents: Traversable[ResourceEventModel],
startingUserState: UserState,
userStateWorker: UserStateWorker,
- policyStore: PolicyStore,
stateChangeReason: UserStateChangeReason,
billingMonthInfo: BillingMonthInfo,
walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
- algorithmCompiler: CostPolicyAlgorithmCompiler,
clogOpt: Option[ContextualLogger] = None): UserState = {
var _workingUserState = startingUserState
_workingUserState,
userStateWorker,
currentResourceEvent,
- policyStore,
stateChangeReason,
billingMonthInfo,
walletEntriesBuffer,
- algorithmCompiler,
clogOpt
)
}
def doFullMonthlyBilling(userId: String,
billingMonthInfo: BillingMonthInfo,
- storeProvider: StoreProvider,
currentUserState: UserState,
defaultResourcesMap: DSLResourcesMap,
- accounting: Accounting,
- algorithmCompiler: CostPolicyAlgorithmCompiler,
calculationReason: UserStateChangeReason = NoSpecificChangeReason,
clogOpt: Option[ContextualLogger] = None): UserState = {
val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
userId,
billingMonthInfo.previousMonth,
- storeProvider,
currentUserState,
defaultResourcesMap,
- accounting,
- algorithmCompiler,
calculationReason.forPreviousBillingMonth,
clogSome
)
val startingUserState = previousBillingMonthUserState
- val userStateStore = storeProvider.userStateStore
- val resourceEventStore = storeProvider.resourceEventStore
- val policyStore = storeProvider.policyStore
val billingMonthStartMillis = billingMonthInfo.startMillis
val billingMonthEndMillis = billingMonthInfo.stopMillis
// NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
- val userStateWorker = UserStateWorker.fromUserState(_workingUserState, accounting, defaultResourcesMap)
+ val userStateWorker = UserStateWorker.fromUserState(_workingUserState, defaultResourcesMap)
userStateWorker.debugTheMaps(clog)(rcDebugInfo)
allResourceEventsForMonth,
_workingUserState,
userStateWorker,
- policyStore,
calculationReason,
billingMonthInfo,
newWalletEntries,
- algorithmCompiler,
clogSome
)
LatestResourceEventsWorker.fromList(specialEvents),
ImplicitlyIssuedResourceEventsWorker.Empty,
IgnoredFirstResourceEventsWorker.Empty,
- userStateWorker.accounting,
userStateWorker.resourcesMap
)
theirImplicitEnds,
_workingUserState,
specialUserStateWorker,
- policyStore,
calculationReason,
billingMonthInfo,
newWalletEntries,
- algorithmCompiler,
clogSome
)
previousResourceEvents: LatestResourceEventsWorker,
implicitlyIssuedStartEvents: ImplicitlyIssuedResourceEventsWorker,
ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker,
- accounting: Accounting,
resourcesMap: DSLResourcesMap) {
/**
}
object UserStateWorker {
- def fromUserState(userState: UserState, accounting: Accounting, resourcesMap: DSLResourcesMap): UserStateWorker = {
+ def fromUserState(userState: UserState, resourcesMap: DSLResourcesMap): UserStateWorker = {
UserStateWorker(
userState.userID,
userState.latestResourceEventsSnapshot.toMutableWorker,
userState.implicitlyIssuedSnapshot.toMutableWorker,
IgnoredFirstResourceEventsWorker.Empty,
- accounting,
resourcesMap
)
}
case Nil => ()
}
- def agreementsByTimeslot: SortedMap[Timeslot, String] = {
- TreeMap(agreements.map(ag => (ag.timeslot, ag.name)): _*)
+ def agreementNamesByTimeslot: SortedMap[Timeslot, String] = {
+ TreeMap(agreements.map(ag ⇒ (ag.timeslot, ag.name)): _*)
+ }
+
+ def agreementsByTimeslot: SortedMap[Timeslot, AgreementHistoryItem] = {
+ TreeMap(agreements.map(ag ⇒ (ag.timeslot, ag)): _*)
}
/**
case None => None
}
}
+
+ /**
+ * Returns the first, chronologically, agreement.
+ */
+ def firstAgreement: Option[AgreementHistoryItem] = {
+ agreementsByTimeslot.valuesIterator.toList.lastOption
+ }
+
+ /**
+ * Returns the name of the first, chronologically, agreement.
+ */
+ def firstAgreementName: Option[String] = {
+ agreementNamesByTimeslot.valuesIterator.toList.lastOption
+ }
+
+ /**
+ * Returns the last, chronologically, agreement.
+ */
+ def lastAgreement: Option[AgreementHistoryItem] = {
+ agreementsByTimeslot.valuesIterator.toList.headOption
+ }
+
+ /**
+ * Returns the name of the last, chronologically, agreement.
+ */
+ def lastAgreementName: Option[String] = {
+ agreementNamesByTimeslot.valuesIterator.toList.headOption
+ }
}
object AgreementHistory {
}
}
+object ImplicitlyIssuedResourceEventsSnapshot {
+ final val Empty = ImplicitlyIssuedResourceEventsSnapshot(Nil)
+}
+
}
}
+object LatestResourceEventsSnapshot {
+ final val Empty = LatestResourceEventsSnapshot(Nil)
+}
(newOwnedResources, oldResourceInstanceOpt, newResourceInstance)
}
+}
+
+object OwnedResourcesSnapshot {
+ final val Empty = OwnedResourcesSnapshot(Nil)
}
\ No newline at end of file
* The head role is the most recent. The same rule applies for the tail.
*/
roles: List[RoleHistoryItem]) {
- def rolesByTimeslot: SortedMap[Timeslot, String] = {
- TreeMap(roles.map(role => (role.timeslot, role.name)): _*)
+
+ def roleNamesByTimeslot: SortedMap[Timeslot, String] = {
+ TreeMap(roles.map(role ⇒ (role.timeslot, role.name)): _*)
+ }
+
+ def rolesByTimeslot: SortedMap[Timeslot, RoleHistoryItem] = {
+ TreeMap(roles.map(role ⇒ (role.timeslot, role)): _*)
}
def copyWithRole(role: String, validFrom: Long) = {
RoleHistory(newItems)
}
+
+ /**
+ * Returns the first, chronologically, role.
+ */
+ def firstRole: Option[RoleHistoryItem] = {
+ rolesByTimeslot.valuesIterator.toList.lastOption
+ }
+
+ /**
+ * Returns the name of the first, chronologically, role.
+ */
+ def firstRoleName: Option[String] = {
+ roleNamesByTimeslot.valuesIterator.toList.lastOption
+ }
+
+ /**
+ * Returns the last, chronologically, role.
+ */
+ def lastRole: Option[RoleHistoryItem] = {
+ rolesByTimeslot.valuesIterator.toList.headOption
+ }
+
+ /**
+ * Returns the name of the last, chronologically, role.
+ */
+ def lastRoleName: Option[String] = {
+ roleNamesByTimeslot.valuesIterator.toList.headOption
+ }
}
object RoleHistory {
package gr.grnet.aquarium.logic.accounting
-import gr.grnet.aquarium.util.shortClassNameOf
import algorithm.CostPolicyAlgorithmCompiler
import dsl._
import collection.immutable.SortedMap
-import java.util.Date
-import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just}
-import gr.grnet.aquarium.util.{ContextualLogger, CryptoUtils, Loggable}
+import com.ckkloverdos.maybe.{NoVal, Maybe, Just}
+import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
import gr.grnet.aquarium.store.PolicyStore
-import gr.grnet.aquarium.event.model.WalletEntry
-import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
+import gr.grnet.aquarium.util.date.MutableDateCalc
import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
import gr.grnet.aquarium.event.model.resource.ResourceEventModel
/**
- * A timeslot together with the algorithm and unit price that apply for this particular timeslot.
- *
- * @param startMillis
- * @param stopMillis
- * @param algorithmDefinition
- * @param unitPrice
- * @param computedCredits The computed credits
- */
-case class Chargeslot(startMillis: Long,
- stopMillis: Long,
- algorithmDefinition: String,
- unitPrice: Double,
- computedCredits: Option[Double] = None) {
-
- override def toString = "%s(%s, %s, %s, %s, %s)".format(
- shortClassNameOf(this),
- new MutableDateCalc(startMillis).toYYYYMMDDHHMMSSSSS,
- new MutableDateCalc(stopMillis).toYYYYMMDDHHMMSSSSS,
- unitPrice,
- computedCredits,
- algorithmDefinition
- )
-}
-
-/**
* Methods for converting accounting events to wallet entries.
*
* @author Georgios Gousios <gousiosg@gmail.com>
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-trait Accounting extends DSLUtils with Loggable {
+trait Accounting extends Loggable {
+ // TODO: favour composition over inheritance until we decide what to do with DSLUtils (and Accounting).
+ protected val dslUtils = new DSLUtils {}
/**
* Breaks a reference timeslot (e.g. billing period) according to policies and agreements.
*
val clog = ContextualLogger.fromOther(clogOpt, logger, "resolveEffectiveAlgorithmsAndPriceLists()")
// Note that most of the code is taken from calcChangeChunks()
- val alg = resolveEffectiveAlgorithmsForTimeslot(alignedTimeslot, agreement)
- val pri = resolveEffectivePricelistsForTimeslot(alignedTimeslot, agreement)
+ val alg = dslUtils.resolveEffectiveAlgorithmsForTimeslot(alignedTimeslot, agreement)
+ val pri = dslUtils.resolveEffectivePricelistsForTimeslot(alignedTimeslot, agreement)
val chargeChunks = splitChargeChunks(alg, pri)
val algorithmByTimeslot = chargeChunks._1
val pricelistByTimeslot = chargeChunks._2
// This is it
val credits = execAlgorithm.apply(valueMap)
- chargeslot.copy(computedCredits = Some(credits))
+ chargeslot.copyWithCredits(credits)
}
val result = referenceTimeslot -> fullChargeslots
}
/**
- * Create a list of wallet entries by charging for a resource event.
- *
- * @param currentResourceEvent The resource event to create charges for
- * @param agreements The user's agreement names, indexed by their
- * applicability timeslot
- * @param previousAmount The current state of the resource
- * @param previousOccurred The last time the resource state was updated
- */
- def chargeEvent(currentResourceEvent: ResourceEventModel,
- agreements: SortedMap[Timeslot, String],
- previousAmount: Double,
- previousOccurred: Date,
- related: List[WalletEntry]): Maybe[List[WalletEntry]] = {
-
- assert(previousOccurred.getTime <= currentResourceEvent.occurredMillis)
- val occuredDate = new Date(currentResourceEvent.occurredMillis)
-
- /* The following makes sure that agreements exist between the start
- * and end days of the processed event. As agreement updates are
- * guaranteed not to leave gaps, this means that the event can be
- * processed correctly, as at least one agreement will be valid
- * throughout the event's life.
- */
- assert(
- agreements.keysIterator.exists {
- p => p.includes(occuredDate)
- } && agreements.keysIterator.exists {
- p => p.includes(previousOccurred)
- }
- )
-
- val t = Timeslot(previousOccurred, occuredDate)
-
- // Align policy and agreement validity timeslots to the event's boundaries
- val policyTimeslots = t.align(
- Policy.policies(previousOccurred, occuredDate).keysIterator.toList)
- val agreementTimeslots = t.align(agreements.keysIterator.toList)
-
- /*
- * Get a set of timeslot slices covering the different durations of
- * agreements and policies.
- */
- val aligned = alignTimeslots(policyTimeslots, agreementTimeslots)
-
- val walletEntries = aligned.map {
- x =>
- // Retrieve agreement from the policy valid at time of event
- val agreementName = agreements.find(y => y._1.contains(x)) match {
- case Some(x) => x
- case None => return Failed(new AccountingException(("Cannot find" +
- " user agreement for period %s").format(x)))
- }
-
- // Do the wallet entry calculation
- val entries = chargeEvent(
- currentResourceEvent,
- Policy.policy(x.from).findAgreement(agreementName._2).getOrElse(
- return Failed(new AccountingException("Cannot get agreement for %s".format()))
- ),
- previousAmount,
- previousOccurred,
- related,
- Some(x)
- ) match {
- case Just(x) => x
- case Failed(f) => return Failed(f)
- case NoVal => List()
- }
- entries
- }.flatten
-
- Just(walletEntries)
- }
-
- /**
- * Creates a list of wallet entries by applying the agreement provisions on
- * the resource state.
- *
- * @param event The resource event to create charges for
- * @param agr The agreement implementation to use
- * @param previousAmount The current state of the resource
- * @param previousOccurred The timestamp of the previous event
- * @param related Related wallet entries (TODO: should remove)
- * @param chargeFor The duration for which the charge should be done.
- * Should fall between the previous and current
- * resource event boundaries
- * @return A list of wallet entries, one for each
- */
- def chargeEvent(event: ResourceEventModel,
- agr: DSLAgreement,
- previousAmount: Double,
- previousOccurred: Date,
- related: List[WalletEntry],
- chargeFor: Option[Timeslot]): Maybe[List[WalletEntry]] = {
-
- // If chargeFor is not null, make sure it falls within
- // event time boundaries
- chargeFor.map{x => assert(true,
- Timeslot(previousOccurred, new Date(event.occurredMillis)))}
-
-// if (!event.validate())
-// return Failed(new AccountingException("Event not valid"))
-
- val policy = Policy.policy
- val dslResource = policy.findResource(event.resource) match {
- case Some(x) => x
- case None => return Failed(
- new AccountingException("No resource [%s]".format(event.resource)))
- }
-
- /* This is a safeguard against the special case where the last
- * resource state update, as marked by the lastUpdate parameter
- * is equal to the time of the event occurrence. This means that
- * this is the first time the resource state has been recorded.
- * Charging in this case only makes sense for discrete resources.
- */
- if (previousOccurred.getTime == event.occurredMillis) {
- dslResource.costPolicy match {
- case DiscreteCostPolicy => //Ok
- case _ => return Just(List())
- }
- }
-
- val creditCalculationValueM = dslResource.costPolicy.getValueForCreditCalculation(Just(previousAmount), event.value)
- val amount = creditCalculationValueM match {
- case failed @ Failed(_) ⇒
- return failed
- case Just(amount) ⇒
- amount
- case NoVal ⇒
- 0.0
- }
-
- // We don't do strict checking for all cases for OnOffPolicies as
- // above, since this point won't be reached in case of error.
- val isFinal = dslResource.costPolicy match {
- case OnOffCostPolicy =>
- OnOffPolicyResourceState(previousAmount) match {
- case OnResourceState => false
- case OffResourceState => true
- }
- case _ => true
- }
-
- /*
- * Get the timeslot for which this event will be charged. In case we
- * have a discrete resource, we do not really care for the time duration
- * of an event. To process all events in a uniform way, we create an
- * artificial timeslot lasting the minimum amount of time. In all other
- * cases, we first check whether a desired charge period passed as
- * an argument.
- */
- val timeslot = dslResource.costPolicy match {
- case DiscreteCostPolicy => Timeslot(new Date(event.occurredMillis - 1),
- new Date(event.occurredMillis))
- case _ => chargeFor match {
- case Some(x) => x
- case None => Timeslot(previousOccurred, new Date(event.occurredMillis))
- }
- }
-
- /*
- * The following splits the chargable timeslot into smaller timeslots to
- * comply with different applicability periods for algorithms and
- * pricelists defined by the provided agreement.
- */
- val chargeChunks = calcChangeChunks(agr, amount, dslResource, timeslot)
-
- val timeReceived = TimeHelpers.nowMillis()
-
- val rel = event.id :: related.map{x => x.sourceEventIDs}.flatten
-
- val entries = chargeChunks.map { c=>
- WalletEntry(
- id = CryptoUtils.sha1(c.id),
- occurredMillis = event.occurredMillis,
- receivedMillis = timeReceived,
- sourceEventIDs = rel,
- value = c.cost,
- reason = c.reason,
- userId = event.userID,
- resource = event.resource,
- instanceId = event.instanceID,
- finalized = isFinal
- )
- }
- Just(entries)
- }
-
- /**
- * Create a
- */
- def calcChangeChunks(agr: DSLAgreement, volume: Double,
- res: DSLResource, t: Timeslot): List[ChargeChunk] = {
-
- val alg = resolveEffectiveAlgorithmsForTimeslot(t, agr)
- val pri = resolveEffectivePricelistsForTimeslot(t, agr)
- val chunks = splitChargeChunks(alg, pri)
- val algChunked = chunks._1
- val priChunked = chunks._2
-
- assert(algChunked.size == priChunked.size)
-
- res.costPolicy match {
- case DiscreteCostPolicy => calcChargeChunksDiscrete(algChunked, priChunked, volume, res)
- case _ => calcChargeChunksContinuous(algChunked, priChunked, volume, res)
- }
- }
-
- /**
- * Get a list of charge chunks for discrete resources.
- */
- private[logic]
- def calcChargeChunksDiscrete(algChunked: Map[Timeslot, DSLAlgorithm],
- priChunked: Map[Timeslot, DSLPriceList],
- volume: Double, res: DSLResource): List[ChargeChunk] = {
- // In case of descrete resources, we only a expect a
- assert(algChunked.size == 1)
- assert(priChunked.size == 1)
- assert(algChunked.keySet.head.compare(priChunked.keySet.head) == 0)
-
- List(ChargeChunk(volume,
- algChunked.valuesIterator.next.algorithms.getOrElse(res, ""),
- priChunked.valuesIterator.next.prices.getOrElse(res, 0),
- algChunked.keySet.head, res))
- }
-
- /**
- * Get a list of charge chunks for continuous resources.
- */
- private[logic]
- def calcChargeChunksContinuous(algChunked: Map[Timeslot, DSLAlgorithm],
- priChunked: Map[Timeslot, DSLPriceList],
- volume: Double, res: DSLResource): List[ChargeChunk] = {
- algChunked.keysIterator.map {
- x =>
- ChargeChunk(volume,
- algChunked.get(x).get.algorithms.getOrElse(res, ""),
- priChunked.get(x).get.prices.getOrElse(res, 0), x, res)
- }.toList
- }
-
- /**
* Align charge timeslots between algorithms and pricelists. As algorithm
* and pricelists can have different effectivity periods, this method
* examines them and splits them as necessary.
}
}
}
-
-/**
- * Encapsulates a computation for a specific timeslot of
- * resource usage.
- */
-case class ChargeChunk(value: Double, algorithm: String,
- price: Double, when: Timeslot,
- resource: DSLResource) {
- assert(value > 0)
- assert(!algorithm.isEmpty)
- assert(resource != null)
-
- def cost(): Double =
- //TODO: Apply the algorithm, when we start parsing it
- resource.costPolicy match {
- case DiscreteCostPolicy =>
- value * price
- case _ =>
- value * price * when.hours
- }
-
- def reason(): String =
- resource.costPolicy match {
- case DiscreteCostPolicy =>
- "%f %s at %s @ %f/%s".format(value, resource.unit, when.from, price,
- resource.unit)
- case ContinuousCostPolicy =>
- "%f %s of %s from %s to %s @ %f/%s".format(value, resource.unit,
- resource.name, when.from, when.to, price, resource.unit)
- case OnOffCostPolicy =>
- "%f %s of %s from %s to %s @ %f/%s".format(when.hours, resource.unit,
- resource.name, when.from, when.to, price, resource.unit)
- }
-
- def id(): String =
- CryptoUtils.sha1("%f%s%f%s%s%d".format(value, algorithm, price, when.toString,
- resource.name, TimeHelpers.nowMillis()))
-}
-
-/** An exception raised when something goes wrong with accounting */
-class AccountingException(msg: String) extends AquariumException(msg)
* or implied, of GRNET S.A.
*/
-package gr.grnet.aquarium.computation
+package gr.grnet.aquarium.logic.accounting
+
+import gr.grnet.aquarium.util._
+import gr.grnet.aquarium.util.date.MutableDateCalc
/**
- * Default implementation for [[gr.grnet.aquarium.computation.UserStateComputations]].
+ * Represents a timeslot together with the algorithm and unit price that apply for this particular timeslot.
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-object DefaultUserStateComputations extends UserStateComputations
\ No newline at end of file
+
+case class Chargeslot(
+ startMillis: Long,
+ stopMillis: Long,
+ algorithmDefinition: String,
+ unitPrice: Double,
+ computedCredits: Option[Double] = None) {
+
+ def copyWithCredits(credits: Double) = {
+ copy(computedCredits = Some(credits))
+ }
+
+ override def toString = "%s(%s, %s, %s, %s, %s)".format(
+ shortClassNameOf(this),
+ new MutableDateCalc(startMillis).toYYYYMMDDHHMMSSSSS,
+ new MutableDateCalc(stopMillis).toYYYYMMDDHHMMSSSSS,
+ unitPrice,
+ computedCredits,
+ algorithmDefinition
+ )
+}
import gr.grnet.aquarium.Aquarium._
import java.io.{InputStream, FileInputStream, File}
import java.util.Date
-import gr.grnet.aquarium.util.date.TimeHelpers
import gr.grnet.aquarium.util.Loggable
import java.util.concurrent.atomic.AtomicReference
-import gr.grnet.aquarium.Aquarium
import gr.grnet.aquarium.Aquarium.Keys
import com.ckkloverdos.maybe.{Failed, NoVal, Just}
import collection.immutable.{TreeMap, SortedMap}
+import gr.grnet.aquarium.{AquariumException, Aquarium}
+import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
/**
* Searches for and loads the applicable accounting policy
} match {
case Some(x) => x._2
case None =>
- throw new AccountingException("No valid policy for date: %s".format(at))
+ throw new AquariumException("No valid policy for date: %s".format(new MutableDateCalc(at)))
}
}
trait StoreProvider {
def userStateStore: UserStateStore
def resourceEventStore: ResourceEventStore
- def walletEntryStore: WalletEntryStore
def imEventStore: IMEventStore
def policyStore: PolicyStore
}
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.store
-
-import java.util.Date
-import com.ckkloverdos.maybe.Maybe
-import gr.grnet.aquarium.event.model.WalletEntry
-
-/**
- * A store for Wallet entries.
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-trait WalletEntryStore {
-
- def storeWalletEntry(entry: WalletEntry): Maybe[RecordID]
-
- def findWalletEntryById(id: String): Maybe[WalletEntry]
-
- def findUserWalletEntries(userId: String): List[WalletEntry]
-
- def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry]
-
- /**
- * Finds latest wallet entries with same timestamp.
- */
- def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]]
-
- /**
- * Find the previous entry in the user's wallet for the provided resource
- * instance id.
- */
- def findPreviousEntry(userId: String, resource: String,
- instanceId: String, finalized: Option[Boolean]): List[WalletEntry]
-
- def findWalletEntriesAfter(userId: String, from: Date): List[WalletEntry]
-}
\ No newline at end of file
package gr.grnet.aquarium.store.memory
import com.ckkloverdos.props.Props
-import com.ckkloverdos.maybe.{NoVal, Just, Maybe}
+import com.ckkloverdos.maybe.{NoVal, Just}
import gr.grnet.aquarium.store._
import scala.collection.JavaConversions._
-import java.util.Date
import collection.mutable.ConcurrentMap
import java.util.concurrent.ConcurrentHashMap
import gr.grnet.aquarium.uid.ConcurrentVMLocalUIDGenerator
import gr.grnet.aquarium.Configurable
-import gr.grnet.aquarium.event.model.{WalletEntry, PolicyEntry}
+import gr.grnet.aquarium.event.model.PolicyEntry
import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
import org.bson.types.ObjectId
import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
import gr.grnet.aquarium.computation.UserState
+import gr.grnet.aquarium.util.Tags
/**
* An implementation of various stores that persists data in memory.
class MemStore extends UserStateStore
with Configurable with PolicyStore
with ResourceEventStore with IMEventStore
- with WalletEntryStore
with StoreProvider {
override type IMEvent = MemIMEvent
override type ResourceEvent = MemResourceEvent
- private[this] val idGen = new ConcurrentVMLocalUIDGenerator(1000)
-
private[this] var _userStates = List[UserState]()
private[this] var _policyEntries = List[PolicyEntry]()
private[this] var _resourceEvents = List[ResourceEvent]()
- private[this] val walletEntriesById: ConcurrentMap[String, WalletEntry] = new ConcurrentHashMap[String, WalletEntry]()
private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
override def toString = {
val map = Map(
- "UserState" -> _userStates.size,
- "ResourceEvent" -> _resourceEvents.size,
- "IMEvent" -> imEventById.size,
- "PolicyEntry" -> _policyEntries.size,
- "WalletEntry" -> walletEntriesById.size
+ Tags.UserStateTag -> _userStates.size,
+ Tags.ResourceEventTag -> _resourceEvents.size,
+ Tags.IMEventTag -> imEventById.size,
+ "PolicyEntry" -> _policyEntries.size
)
"MemStore(%s)" format map
def resourceEventStore = this
- def walletEntryStore = this
-
def imEventStore = this
def policyStore = this
//+ UserStateStore
def insertUserState(userState: UserState): UserState = {
- _userStates = userState.copy(_id = new ObjectId()) :: _userStates
+ _userStates = userState.copy(_id = new ObjectId().toString) :: _userStates
userState
}
}
//- UserStateStore
- //- WalletEntryStore
- def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
- walletEntriesById.put(entry.id, entry)
- Just(RecordID(entry.id))
- }
-
- def findWalletEntryById(id: String): Maybe[WalletEntry] = {
- Maybe(walletEntriesById.apply(id))
- }
-
- def findUserWalletEntries(userId: String): List[WalletEntry] = {
- walletEntriesById.valuesIterator.filter(_.userId == userId).toList
- }
-
- def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry] = {
- walletEntriesById.valuesIterator.filter { we ⇒
- val receivedDate = we.receivedDate
-
- we.userId == userId &&
- ( (from before receivedDate) || (from == receivedDate) ) &&
- ( (to after receivedDate) || (to == receivedDate) )
- true
- }.toList
- }
-
- def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]] = NoVal
-
- def findPreviousEntry(userId: String,
- resource: String,
- instanceId: String,
- finalized: Option[Boolean]): List[WalletEntry] = Nil
-
- def findWalletEntriesAfter(userID: String, from: Date): List[WalletEntry] = {
- walletEntriesById.valuesIterator.filter { we ⇒
- val occurredDate = we.occurredDate
-
- we.userId == userID &&
- ( (from before occurredDate) || (from == occurredDate) )
- }.toList
- }
- //- WalletEntryStore
-
//+ ResourceEventStore
def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
import gr.grnet.aquarium.event.model.resource.ResourceEventModel
import gr.grnet.aquarium.event.model.resource.ResourceEventModel.{Names ⇒ ResourceEventNames}
import gr.grnet.aquarium.store._
-import gr.grnet.aquarium.event.model.WalletEntry.{JsonNames ⇒ WalletJsonNames}
import gr.grnet.aquarium.event.model.PolicyEntry.{JsonNames ⇒ PolicyJsonNames}
-import java.util.Date
import gr.grnet.aquarium.logic.accounting.Policy
import com.mongodb._
import org.bson.types.ObjectId
import gr.grnet.aquarium.util._
import gr.grnet.aquarium.converter.Conversions
import gr.grnet.aquarium.computation.UserState
-import gr.grnet.aquarium.event.model.{ExternalEventModel, WalletEntry, PolicyEntry}
+import gr.grnet.aquarium.event.model.{ExternalEventModel, PolicyEntry}
/**
* Mongodb implementation of the various aquarium stores.
val password: String)
extends ResourceEventStore
with UserStateStore
- with WalletEntryStore
with IMEventStore
with PolicyStore
with Loggable {
private[store] lazy val userStates = getCollection(MongoDBStore.USER_STATES_COLLECTION)
private[store] lazy val imEvents = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
private[store] lazy val unparsedIMEvents = getCollection(MongoDBStore.UNPARSED_IM_EVENTS_COLLECTION)
- private[store] lazy val walletEntries = getCollection(MongoDBStore.WALLET_ENTRIES_COLLECTION)
private[store] lazy val policyEntries = getCollection(MongoDBStore.POLICY_ENTRIES_COLLECTION)
private[this] def getCollection(name: String): DBCollection = {
}
//- UserStateStore
- //+WalletEntryStore
- def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
- Maybe {
- MongoDBStore.storeAny[WalletEntry](
- entry,
- walletEntries,
- WalletJsonNames.id,
- (e) => e.id,
- MongoDBStore.jsonSupportToDBObject)
- }
- }
-
- def findWalletEntryById(id: String): Maybe[WalletEntry] = {
- MongoDBStore.findBy(WalletJsonNames.id, id, walletEntries, MongoDBStore.dbObjectToWalletEntry): Maybe[WalletEntry]
- }
-
- def findUserWalletEntries(userId: String) = {
- // TODO: optimize
- findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
- }
-
- def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
- val q = new BasicDBObject()
- // TODO: Is this the correct way for an AND query?
- q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
- q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$lt", to.getTime))
- q.put(WalletJsonNames.userId, userId)
-
- MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
- }
-
- def findWalletEntriesAfter(userId: String, from: Date) : List[WalletEntry] = {
- val q = new BasicDBObject()
- q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
- q.put(WalletJsonNames.userId, userId)
-
- MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
- }
-
- def findLatestUserWalletEntries(userId: String) = {
- Maybe {
- val orderBy = new BasicDBObject(WalletJsonNames.occurredMillis, -1) // -1 is descending order
- val cursor = walletEntries.find().sort(orderBy)
-
- try {
- val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
- if(cursor.hasNext) {
- val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
- buffer += walletEntry
-
- var _previousOccurredMillis = walletEntry.occurredMillis
- var _ok = true
-
- while(cursor.hasNext && _ok) {
- val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
- var currentOccurredMillis = walletEntry.occurredMillis
- _ok = currentOccurredMillis == _previousOccurredMillis
-
- if(_ok) {
- buffer += walletEntry
- }
- }
-
- buffer.toList
- } else {
- null
- }
- } finally {
- cursor.close()
- }
- }
- }
-
- def findPreviousEntry(userId: String, resource: String,
- instanceId: String,
- finalized: Option[Boolean]): List[WalletEntry] = {
- val q = new BasicDBObject()
- q.put(WalletJsonNames.userId, userId)
- q.put(WalletJsonNames.resource, resource)
- q.put(WalletJsonNames.instanceId, instanceId)
- finalized match {
- case Some(x) => q.put(WalletJsonNames.finalized, x)
- case None =>
- }
-
- MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
- }
- //-WalletEntryStore
-
//+IMEventStore
def createIMEventFromJson(json: String) = {
MongoDBStore.createIMEventFromJson(json)
final val UNPARSED_IM_EVENTS_COLLECTION = "unparsed_imevents"
/**
- * Collection holding [[gr.grnet.aquarium.event.model.WalletEntry]].
- *
- * Wallet entries are generated internally in Aquarium.
- */
- final val WALLET_ENTRIES_COLLECTION = "wallets"
-
- /**
* Collection holding [[gr.grnet.aquarium.logic.accounting.dsl.DSLPolicy]].
*/
// final val POLICIES_COLLECTION = "policies"
UserState.fromJson(JSON.serialize(dbObj))
}
- def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
- WalletEntry.fromJson(JSON.serialize(dbObj))
- }
-
def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
PolicyEntry.fromJson(JSON.serialize(dbObj))
}
def userStateStore = _mongoDBStore
def resourceEventStore = _mongoDBStore
- def walletEntryStore = _mongoDBStore
def imEventStore = _mongoDBStore
def policyStore = _mongoDBStore
}
object Tags {
final val ResourceEventTag = newTag("ResourceEvent")
final val IMEventTag = newTag("IMEvent")
+ final val UserStateTag = newTag("UserState")
}
# Override the event store (if present, it will not be given by the store provider above)
resource.event.store.class=gr.grnet.aquarium.store.memory.MemStore
-# Override the WalletEntry store (if present, it will not be given by the store provider above)
-wallet.entry.store.class=gr.grnet.aquarium.store.memory.MemStore
-
# Override the user event store (if present, it will not be given by the store provider above)
user.event.store.class=gr.grnet.aquarium.store.memory.MemStore
package gr.grnet.aquarium.logic.test
import gr.grnet.aquarium.util.TestMethods
-import org.junit.{Test}
-import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
+import org.junit.Test
import java.util.Date
import junit.framework.Assert._
-import gr.grnet.aquarium.logic.accounting.{Accounting}
-import gr.grnet.aquarium.event.model.WalletEntry
-import com.ckkloverdos.maybe.Just
-import gr.grnet.aquarium.event.model.resource.StdResourceEvent
+import gr.grnet.aquarium.logic.accounting.Accounting
+import gr.grnet.aquarium.logic.accounting.dsl.{DSLUtils, Timeslot}
/**
* Tests for the methods that do accounting
* @author Georgios Gousios <gousiosg@gmail.com>
*/
class AccountingTest extends DSLTestBase with Accounting with TestMethods {
-
@Test
def testAlignTimeslots() {
var a = List(Timeslot(0,1))
val from = new Date(1322555880000L) //Tue, 29 Nov 2011 10:38:00 EET
val to = new Date(1322689082000L) //Wed, 30 Nov 2011 23:38:02 EET
val agr = dsl.findAgreement("complextimeslots").get
- a = resolveEffectiveAlgorithmsForTimeslot(Timeslot(from, to), agr).keySet.toList
- b = resolveEffectivePricelistsForTimeslot(Timeslot(from, to), agr).keySet.toList
+ a = dslUtils.resolveEffectiveAlgorithmsForTimeslot(Timeslot(from, to), agr).keySet.toList
+ b = dslUtils.resolveEffectivePricelistsForTimeslot(Timeslot(from, to), agr).keySet.toList
result = alignTimeslots(a, b)
assertEquals(9, result.size)
val agr = dsl.findAgreement("scaledbandwidth").get
- val alg = resolveEffectiveAlgorithmsForTimeslot(Timeslot(from, to), agr)
- val price = resolveEffectivePricelistsForTimeslot(Timeslot(from, to), agr)
+ val alg = dslUtils.resolveEffectiveAlgorithmsForTimeslot(Timeslot(from, to), agr)
+ val price = dslUtils.resolveEffectivePricelistsForTimeslot(Timeslot(from, to), agr)
val chunks = splitChargeChunks(alg, price)
val algChunks = chunks._1
val priceChunks = chunks._2
t => assertEquals(t._1, t._2)
}
}
-
- @Test
- def testChargeEvent(): Unit = {
- before
- val agr = dsl.findAgreement("scaledbandwidth").get
-
- //Simple, continuous resource
- var evt = StdResourceEvent("123", 1325762772000L, 1325762774000L, "12", "1", "bandwidthup", "1", 123, "1", Map())
- var wallet = chargeEvent(evt, agr, 112, new Date(1325755902000L), List(), None)
- wallet match {
- case Just(x) => assertEquals(2, x.size)
- case _ => fail("No results returned")
- }
-
- //Complex resource event without details, should fail
- evt = StdResourceEvent("123", 1325762772000L, 1325762774000L, "12", "1", "vmtime", "1", 1, "1", Map())
- assertFailed[Exception, List[WalletEntry]](chargeEvent(evt, agr, 1, new Date(1325755902000L), List(), None))
-
- //Complex, onoff resource
- evt = StdResourceEvent("123", 1325762772000L, 1325762774000L, "12", "1", "vmtime", "1", 1, "1", Map("vmid" -> "3"))
- wallet = chargeEvent(evt, agr, 0, new Date(1325755902000L), List(), None)
- wallet match {
- case Just(x) => assertEquals(2, x.size)
- case _ => fail("No results returned")
- }
-
- //Complex, onoff resource, with wrong states, should fail
- evt = StdResourceEvent("123", 1325762772000L, 1325762774000L, "12", "1", "vmtime", "1", 1, "1", Map("vmid" -> "3"))
- assertFailed[Exception, List[WalletEntry]](chargeEvent(evt, agr, 1, new Date(1325755902000L), List(), None))
-
- //Simple, discrete resource
- evt = StdResourceEvent("123", 1325762772000L, 1325762774000L, "12", "1", "bookpages", "1", 120, "1", Map())
- wallet = chargeEvent(evt, agr, 15, new Date(1325755902000L), List(), None)
- wallet match {
- case Just(x) => assertEquals(1, x.size)
- case _ => fail("No results returned")
- }
-
- //Simple, discrete resource, time of last update equal to current event's occurred time
- evt = StdResourceEvent("123", 1325762772000L, 1325762774000L, "12", "1", "bookpages", "1", 120, "1", Map())
- wallet = chargeEvent(evt, agr, 15, new Date(1325762772000L), List(), None)
- assertEquals(1, wallet.getOr(List(WalletEntry.zero, WalletEntry.zero)).size)
-
- //Simple, continuous resource, time of last update equal to current event's occurred time
- evt = StdResourceEvent("123", 1325762772000L, 1325762774000L, "12", "1", "bandwidthup", "1", 123, "1", Map())
- wallet = chargeEvent(evt, agr, 15, new Date(1325762772000L), List(), None)
- assertEquals(0, wallet.getOr(List(WalletEntry.zero)).size)
- }
}
\ No newline at end of file
DiskspacePriceUnit
)
- val Computations = new UserStateComputations
+ val aquarium = AquariumInstance.withStoreProviderClass(classOf[MemStore])
+ Policy.withConfigurator(aquarium)
+ val StoreProvider = aquarium.storeProvider
+ val ResourceEventStore = StoreProvider.resourceEventStore
- val DefaultPolicy = new DSL{} parse PolicyYAML
+ val Computations = aquarium.userStateComputations
+
+ val DSL = new DSL {}
+ val DefaultPolicy = DSL parse PolicyYAML
val DefaultAccounting = new Accounting{}
val DefaultAlgorithm = new ExecutableCostPolicyAlgorithm {
val Synnefo = ClientSim("synnefo")(TheUIDGenerator)
val Pithos = ClientSim("pithos" )(TheUIDGenerator)
- val aquarium = AquariumInstance.withStoreProviderClass(classOf[MemStore])
- Policy.withConfigurator(aquarium)
- val StoreProvider = aquarium.storeProvider
- val ResourceEventStore = StoreProvider.resourceEventStore
-
val StartOfBillingYearDateCalc = new MutableDateCalc(2012, 1, 1)
val UserCreationDate = new MutableDateCalc(2011, 11, 1).toDate
val InitialUserState = UserState.createInitialUserState(
userID = UserCKKL.userId,
userCreationMillis = UserCreationDate.getTime,
- isActive = true,
- credits = 0.0,
- roleNames = List("user"),
- agreementName = DSLAgreement.DefaultAgreementName
+ totalCredits = 0.0,
+ initialRole = "default",
+ initialAgreement = DSLAgreement.DefaultAgreementName
)
// By convention
def showUserState(clog: ContextualLogger, userState: UserState) {
val id = userState._id
val parentId = userState.parentUserStateId
- val credits = userState.creditsSnapshot.creditAmount
+ val credits = userState.totalCredits
val newWalletEntries = userState.newWalletEntries.map(_.toDebugString)
val changeReason = userState.lastChangeReason
val implicitlyIssued = userState.implicitlyIssuedSnapshot.implicitlyIssuedEvents.map(_.toDebugString())
Computations.doFullMonthlyBilling(
UserCKKL.userId,
billingMonthInfo,
- StoreProvider,
InitialUserState,
DefaultResourcesMap,
- DefaultAccounting,
- DefaultCompiler,
MonthlyBillingCalculation(billingMonthInfo),
Some(clog)
)
}
-
- private[this]
- def justUserState(userStateM: Maybe[UserState]): UserState = {
- userStateM match {
- case Just(userState) ⇒ userState
- case _ ⇒ throw new AquariumException("Unexpected %s".format(userStateM))
- }
- }
-
+
private[this]
def expectCredits(clog: ContextualLogger,
creditsConsumed: Double,
userState: UserState,
accuracy: Double = 0.001): Unit = {
- val computed = userState.creditsSnapshot.creditAmount
+ val computed = userState.totalCredits
Assert.assertEquals(-creditsConsumed, computed, accuracy)
clog.info("Consumed %.3f credits [accuracy = %f]", creditsConsumed, accuracy)
}