From 0e00e810df2c3bd7e3a98f4e0d1be4bc54ce71f3 Mon Sep 17 00:00:00 2001 From: Christos KK Loverdos Date: Wed, 30 May 2012 18:11:28 +0300 Subject: [PATCH] WIP Resource event handling --- src/main/scala/gr/grnet/aquarium/Aquarium.scala | 17 ++++ .../actor/message/GetUserBalanceRequest.scala | 5 +- .../actor/message/GetUserBalanceResponse.scala | 8 +- .../actor/message/GetUserStateRequest.scala | 5 +- .../actor/message/RouterResponseMessage.scala | 17 ++-- .../actor/message/UserActorRequestMessage.scala | 4 +- .../actor/message/config/InitializeUserState.scala | 13 ++- .../actor/message/event/ProcessIMEvent.scala | 4 +- .../actor/message/event/ProcessResourceEvent.scala | 5 +- .../aquarium/actor/service/rest/RESTActor.scala | 16 +-- .../actor/service/router/RouterActor.scala | 10 +- .../aquarium/actor/service/user/UserActor.scala | 102 +++++++++++++++----- .../gr/grnet/aquarium/computation/UserState.scala | 10 ++ .../computation/UserStateBootstrappingData.scala | 50 ++++++++++ .../computation/UserStateComputations.scala | 29 +++--- .../aquarium/computation/UserStateWorker.scala | 6 +- .../computation/data/IMStateSnapshot.scala | 37 +++++-- .../aquarium/computation/data/RoleHistory.scala | 21 ++-- .../computation/data/RoleHistoryItem.scala | 2 +- .../aquarium/simulation/ResourceInstanceSim.scala | 2 +- .../gr/grnet/aquarium/simulation/UserSim.scala | 4 +- .../aquarium/store/mongodb/MongoDBIMEvent.scala | 18 ++++ .../aquarium/user/UserStateComputationsTest.scala | 23 +++-- 23 files changed, 308 insertions(+), 100 deletions(-) create mode 100644 src/main/scala/gr/grnet/aquarium/computation/UserStateBootstrappingData.scala diff --git a/src/main/scala/gr/grnet/aquarium/Aquarium.scala b/src/main/scala/gr/grnet/aquarium/Aquarium.scala index c5a5f4e..6e96df5 100644 --- a/src/main/scala/gr/grnet/aquarium/Aquarium.scala +++ b/src/main/scala/gr/grnet/aquarium/Aquarium.scala @@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicBoolean import gr.grnet.aquarium.ResourceLocator._ import gr.grnet.aquarium.computation.UserStateComputations import gr.grnet.aquarium.logic.accounting.algorithm.{SimpleCostPolicyAlgorithmCompiler, CostPolicyAlgorithmCompiler} +import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap +import gr.grnet.aquarium.logic.accounting.Policy /** * This is the Aquarium entry point. @@ -381,6 +383,21 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable { } def storeProvider = _storeProvider + + def currentResourcesMap: DSLResourcesMap = { + // FIXME: Get rid of this singleton stuff + Policy.policy.resourcesMap + } + + def initialAgreementForRole(role: String, referenceTimeMillis: Long): String = { + // FIXME: Where is the mapping? + "default" + } + + def initialBalanceForRole(role: String, referenceTimeMillis: Long): Double = { + // FIXME: Where is the mapping? + 10000.0 + } def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Aquarium = { val map = this.props.map diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceRequest.scala b/src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceRequest.scala index ce7fe67..9fcf6e1 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceRequest.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceRequest.scala @@ -43,4 +43,7 @@ package gr.grnet.aquarium.actor.message case class GetUserBalanceRequest(userID: String, timestamp: Long) extends ActorMessage with RouterRequestMessage - with UserActorRequestMessage + with UserActorRequestMessage { + + def referenceTimeMillis = timestamp +} diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceResponse.scala b/src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceResponse.scala index 790984e..47cf87d 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceResponse.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceResponse.scala @@ -41,6 +41,8 @@ package gr.grnet.aquarium.actor.message */ case class GetUserBalanceResponse( - userID: String, - balance: Either[String, Double]) -extends RouterResponseMessage(balance) + balance: Either[String, GetUserBalanceResponseData], + override val suggestedHTTPStatus: Int = 200) +extends RouterResponseMessage(balance, suggestedHTTPStatus) + +case class GetUserBalanceResponseData(userID: String, balance: Double) diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/GetUserStateRequest.scala b/src/main/scala/gr/grnet/aquarium/actor/message/GetUserStateRequest.scala index 80f9c11..1086df2 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/message/GetUserStateRequest.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/message/GetUserStateRequest.scala @@ -43,4 +43,7 @@ package gr.grnet.aquarium.actor.message case class GetUserStateRequest(userID: String, timestamp: Long) extends ActorMessage with RouterRequestMessage - with UserActorRequestMessage + with UserActorRequestMessage { + + def referenceTimeMillis = timestamp +} diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/RouterResponseMessage.scala b/src/main/scala/gr/grnet/aquarium/actor/message/RouterResponseMessage.scala index 6fad97e..424be05 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/message/RouterResponseMessage.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/message/RouterResponseMessage.scala @@ -43,16 +43,17 @@ import gr.grnet.aquarium.converter.{StdConverters, PrettyJsonTextFormat} * @author Christos KK Loverdos */ -abstract class RouterResponseMessage[T](val response: Either[String, T]) extends ActorMessage { +abstract class RouterResponseMessage[T]( + val response: Either[String, T], + val suggestedHTTPStatus: Int = 200) +extends ActorMessage { + def isError = response.isRight def responseToJsonString: String = { - response match { - case Left(error) ⇒ - "{}" - - case Right(data) ⇒ - StdConverters.AllConverters.convertEx[PrettyJsonTextFormat](data).value - } + response.fold( + _ ⇒ "", // No JSON response on error + data ⇒ StdConverters.AllConverters.convertEx[PrettyJsonTextFormat](data).value + ) } } diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/UserActorRequestMessage.scala b/src/main/scala/gr/grnet/aquarium/actor/message/UserActorRequestMessage.scala index c5eb6fd..1b327d6 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/message/UserActorRequestMessage.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/message/UserActorRequestMessage.scala @@ -41,4 +41,6 @@ package gr.grnet.aquarium.actor.message * @author Christos KK Loverdos */ -trait UserActorRequestMessage extends ActorMessage +trait UserActorRequestMessage extends ActorMessage { + def referenceTimeMillis: Long +} diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/config/InitializeUserState.scala b/src/main/scala/gr/grnet/aquarium/actor/message/config/InitializeUserState.scala index 2943c0f..ce841c9 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/message/config/InitializeUserState.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/message/config/InitializeUserState.scala @@ -35,7 +35,9 @@ package gr.grnet.aquarium.actor.message.config -import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, ActorMessage} +import gr.grnet.aquarium.actor.message.ActorMessage +import gr.grnet.aquarium.util.shortClassNameOf +import gr.grnet.aquarium.util.date.MutableDateCalc /** @@ -44,4 +46,11 @@ import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, ActorMessage} * @author Christos KK Loverdos */ -case class InitializeUserState(userID: String) extends ActorMessage with ActorConfigurationMessage +case class InitializeUserState(userID: String, referenceTimeMillis: Long) +extends ActorMessage + with ActorConfigurationMessage { + + override def toString = { + "%s(%s, %s)".format(shortClassNameOf(this), userID, new MutableDateCalc(referenceTimeMillis).toString) + } +} diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessIMEvent.scala b/src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessIMEvent.scala index 7dc3dc0..4f7eee0 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessIMEvent.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessIMEvent.scala @@ -47,4 +47,6 @@ import gr.grnet.aquarium.event.model.im.IMEventModel * @author Christos KK Loverdos */ -case class ProcessIMEvent(imEvent: IMEventModel) extends ActorMessage with UserActorRequestMessage +case class ProcessIMEvent(imEvent: IMEventModel) extends ActorMessage with UserActorRequestMessage { + def referenceTimeMillis = imEvent.occurredMillis +} diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessResourceEvent.scala b/src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessResourceEvent.scala index fae5cad..63a7447 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessResourceEvent.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessResourceEvent.scala @@ -48,4 +48,7 @@ import gr.grnet.aquarium.event.model.resource.ResourceEventModel */ case class ProcessResourceEvent(rcEvent: ResourceEventModel) extends ActorMessage - with UserActorRequestMessage + with UserActorRequestMessage { + + def referenceTimeMillis = rcEvent.occurredMillis +} diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala index 8fd252a..e26f4a3 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala @@ -143,8 +143,8 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable { private[this] def callRouter(message: RouterRequestMessage, responder: RequestResponder): Unit = { - val configurator = Aquarium.Instance - val actorProvider = configurator.actorProvider + val aquarium = Aquarium.Instance + val actorProvider = aquarium.actorProvider val router = actorProvider.actorForRole(RouterRole) val futureResponse = router ask message @@ -152,7 +152,9 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable { future ⇒ future.value match { case None ⇒ - // TODO: Will this ever happen?? + // TODO: Will this ever happen?? + logger.warn("Future did not complete for %s".format(message)) + responder.complete(stringResponse(500, "Internal Server Error", "text/plain")) case Some(Left(error)) ⇒ logger.error("Error serving %s: %s".format(message, error)) @@ -162,14 +164,14 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable { actualResponse match { case routerResponse: RouterResponseMessage[_] ⇒ routerResponse.response match { - case Left(error) ⇒ - logger.error("Error %s serving %s: Response is: %s".format(error, message, actualResponse)) - responder.complete(stringResponse(500, "Internal Server Error", "text/plain")) + case Left(errorMessage) ⇒ + logger.error("Error '%s' serving %s. Response is: %s".format(errorMessage, message, actualResponse)) + responder.complete(stringResponse(routerResponse.suggestedHTTPStatus, errorMessage, "text/plain")) case Right(response) ⇒ responder.complete( HttpResponse( - status = 200, + routerResponse.suggestedHTTPStatus, body = routerResponse.responseToJsonString.getBytes("UTF-8"), headers = HttpHeader("Content-type", "application/json;charset=utf-8") :: Nil)) } diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala index 1accc85..6d10bab 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala @@ -58,28 +58,28 @@ class RouterActor extends ReflectiveRoleableActor { def role = RouterRole - private[this] def _launchUserActor(userID: String): ActorRef = { + private[this] def _launchUserActor(userID: String, referenceTimeMillis: Long): ActorRef = { // create a fresh instance val userActor = _actorProvider.actorForRole(UserActorRole) UserActorCache.put(userID, userActor) - userActor ! InitializeUserState(userID) + userActor ! InitializeUserState(userID, referenceTimeMillis) userActor } - private[this] def _findOrCreateUserActor(userID: String): ActorRef = { + private[this] def _findOrCreateUserActor(userID: String, referenceTimeMillis: Long): ActorRef = { UserActorCache.get(userID) match { case Some(userActorRef) ⇒ userActorRef case None ⇒ - _launchUserActor(userID) + _launchUserActor(userID, referenceTimeMillis) } } private[this] def _forwardToUserActor(userID: String, m: UserActorRequestMessage): Unit = { - _findOrCreateUserActor(userID) forward m + _findOrCreateUserActor(userID, m.referenceTimeMillis) forward m } diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index 12a2c08..23a9413 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -46,8 +46,11 @@ import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEve import gr.grnet.aquarium.computation.data.IMStateSnapshot import gr.grnet.aquarium.event.model.im.IMEventModel import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded} -import gr.grnet.aquarium.computation.NewUserState -import gr.grnet.aquarium.actor.message.{GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest} +import gr.grnet.aquarium.actor.message.{GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest} +import gr.grnet.aquarium.computation.{BillingMonthInfo, UserStateBootstrappingData, UserState} +import gr.grnet.aquarium.util.date.TimeHelpers +import gr.grnet.aquarium.logic.accounting.Policy +import gr.grnet.aquarium.computation.reason.InitialUserStateSetup /** * @@ -57,8 +60,7 @@ import gr.grnet.aquarium.actor.message.{GetUserBalanceResponse, GetUserStateRequ class UserActor extends ReflectiveRoleableActor { private[this] var _userID: String = "" private[this] var _imState: IMStateSnapshot = _ -// private[this] var _userState: UserState = _ - private[this] var _newUserState: NewUserState = _ + private[this] var _userState: UserState = _ self.lifeCycle = Temporary @@ -80,13 +82,14 @@ class UserActor extends ReflectiveRoleableActor { def role = UserActorRole private[this] def aquarium: Aquarium = Aquarium.Instance + private[this] def userStateComputations = aquarium.userStateComputations - private[this] def _timestampTheshold = + private[this] def _timestampTheshold = { aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000) - + } private[this] def _haveUserState = { - this._newUserState ne null + this._userState ne null } private[this] def _haveIMState = { @@ -99,7 +102,8 @@ class UserActor extends ReflectiveRoleableActor { def onActorProviderConfigured(event: ActorProviderConfigured): Unit = { } - private[this] def createIMState(userID: String): Unit = { + private[this] def createIMState(event: InitializeUserState): Unit = { + val userID = event.userID val store = aquarium.imEventStore // TODO: Optimization: Since IMState only records roles, we should incrementally // TODO: built it only for those IMEvents that changed the role. @@ -111,7 +115,7 @@ class UserActor extends ReflectiveRoleableActor { IMStateSnapshot.initial(imEvent) case currentState ⇒ - currentState.copyWithEvent(imEvent) + currentState.updateHistoryWithEvent(imEvent) } this._imState = newState @@ -120,7 +124,47 @@ class UserActor extends ReflectiveRoleableActor { DEBUG("Recomputed %s = %s", shortNameOfClass(classOf[IMStateSnapshot]), this._imState) } - private[this] def createUserState(userID: String): Unit = { + /** + * Resource events are processed only if the user has been activated. + */ + private[this] def shouldProcessResourceEvents: Boolean = { + _haveIMState && this._imState.hasBeenActivated + } + + private[this] def createUserState(event: InitializeUserState): Unit = { + val userID = event.userID + val referenceTime = event.referenceTimeMillis + + if(!_haveIMState) { + // Should have been created from `createIMState()` + DEBUG("Cannot create user state from %s, since %s = %s", event, shortNameOfClass(classOf[IMStateSnapshot]), this._imState) + return + } + + if(!this._imState.hasBeenActivated) { + // Cannot set the initial state! + DEBUG("Cannot create user state from %s, since user is inactive", event) + return + } + + val userActivationMillis = this._imState.userActivationMillis.get + val initialRole = this._imState.roleHistory.firstRole.get.name + + val userStateBootstrap = UserStateBootstrappingData( + this._userID, + userActivationMillis, + initialRole, + aquarium.initialAgreementForRole(initialRole, userActivationMillis), + aquarium.initialBalanceForRole(initialRole, userActivationMillis) + ) + + userStateComputations.doFullMonthlyBilling( + userStateBootstrap, + BillingMonthInfo.fromMillis(TimeHelpers.nowMillis()), + aquarium.currentResourcesMap, + InitialUserStateSetup, + None + ) } def onInitializeUserState(event: InitializeUserState): Unit = { @@ -128,13 +172,8 @@ class UserActor extends ReflectiveRoleableActor { this._userID = userID DEBUG("Got %s", event) - createIMState(userID) - createUserState(userID) - } - - private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = { - // FIXME: Implement based on the role - "default" + createIMState(event) + createUserState(event) } /** @@ -154,30 +193,43 @@ class UserActor extends ReflectiveRoleableActor { // This happens when the actor is brought to life, then immediately initialized, and then // sent the first IM event. But from the initialization procedure, this IM event will have // already been loaded from DB! - DEBUG("Ignoring first %s after birth", imEvent.toDebugString) + INFO("Ignoring first %s after birth", imEvent.toDebugString) return } - this._imState = this._imState.copyWithEvent(imEvent) + this._imState = this._imState.updateHistoryWithEvent(imEvent) - DEBUG("Update %s = %s", shortClassNameOf(this._imState), this._imState) + INFO("Update %s = %s", shortClassNameOf(this._imState), this._imState) } def onProcessResourceEvent(event: ProcessResourceEvent): Unit = { val rcEvent = event.rcEvent - if(!_haveIMState) { + if(!shouldProcessResourceEvents) { // This means the user has not been activated. So, we do not process any resource event - INFO("Not processing %s", rcEvent.toJsonString) + DEBUG("Not processing %s", rcEvent.toJsonString) return } } def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = { - val userId = event.userID - // FIXME: Implement -// self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount)) + val userID = event.userID + + if(!_haveIMState) { + // No IMEvent has arrived, so this user is virtually unknown + self reply GetUserBalanceResponse(Left("User not found"), 404/*Not found*/) + } + else if(!_haveUserState) { + // The user is known but we have no state. + // Ridiculous. Should have been created at least during initialization. + } + + if(!_haveUserState) { + self reply GetUserBalanceResponse(Left("Not found"), 404/*Not found*/) + } else { + self reply GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits))) + } } def onGetUserStateRequest(event: GetUserStateRequest): Unit = { diff --git a/src/main/scala/gr/grnet/aquarium/computation/UserState.scala b/src/main/scala/gr/grnet/aquarium/computation/UserState.scala index 2977dfa..e1c4cbb 100644 --- a/src/main/scala/gr/grnet/aquarium/computation/UserState.scala +++ b/src/main/scala/gr/grnet/aquarium/computation/UserState.scala @@ -242,6 +242,16 @@ object UserState { ) } + def createInitialUserState(usb: UserStateBootstrappingData): UserState = { + createInitialUserState( + usb.userID, + usb.userCreationMillis, + usb.initialCredits, + usb.initialRole, + usb.initialAgreement + ) + } + def createInitialUserStateFrom(us: UserState): UserState = { createInitialUserState( us.userID, diff --git a/src/main/scala/gr/grnet/aquarium/computation/UserStateBootstrappingData.scala b/src/main/scala/gr/grnet/aquarium/computation/UserStateBootstrappingData.scala new file mode 100644 index 0000000..54fc9dc --- /dev/null +++ b/src/main/scala/gr/grnet/aquarium/computation/UserStateBootstrappingData.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2011-2012 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed + * or implied, of GRNET S.A. + */ + +package gr.grnet.aquarium.computation + +/** + * Used to bootstrao the user state. + * + * @author Christos KK Loverdos + */ + +case class UserStateBootstrappingData( + userID: String, + userCreationMillis: Long, + initialRole: String, + initialAgreement: String, + initialCredits: Double +) diff --git a/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala b/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala index 3545449..8649d95 100644 --- a/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala +++ b/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala @@ -60,9 +60,8 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { protected lazy val userStateStore = storeProvider.userStateStore protected lazy val resourceEventStore = storeProvider.resourceEventStore - def findUserStateAtEndOfBillingMonth(userId: String, + def findUserStateAtEndOfBillingMonth(userStateBootstrap: UserStateBootstrappingData, billingMonthInfo: BillingMonthInfo, - currentUserState: UserState, defaultResourcesMap: DSLResourcesMap, calculationReason: UserStateChangeReason, clogOpt: Option[ContextualLogger] = None): UserState = { @@ -75,15 +74,15 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { def doCompute: UserState = { doFullMonthlyBilling( - userId, + userStateBootstrap, billingMonthInfo, - currentUserState, defaultResourcesMap, calculationReason, Some(clog)) } - val userCreationMillis = currentUserState.userCreationMillis + val userID = userStateBootstrap.userID + val userCreationMillis = userStateBootstrap.userCreationMillis val userCreationDateCalc = new MutableDateCalc(userCreationMillis) val billingMonthStartMillis = billingMonthInfo.startMillis val billingMonthStopMillis = billingMonthInfo.stopMillis @@ -93,7 +92,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { clog.debug("User did not exist before %s", userCreationDateCalc) // NOTE: Reason here will be: InitialUserStateSetup$ - val initialUserState0 = UserState.createInitialUserStateFrom(currentUserState) + val initialUserState0 = UserState.createInitialUserState(userStateBootstrap) val initialUserState1 = userStateStore.insertUserState(initialUserState0) clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1)) @@ -103,7 +102,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { } else { // Ask DB cache for the latest known user state for this billing period val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth( - userId, + userID, billingMonthInfo.year, billingMonthInfo.month) @@ -120,7 +119,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { // For this reason, we must count the events again. val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod( - userId, + userID, billingMonthStartMillis, billingMonthStopMillis) @@ -150,10 +149,9 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { } //+ Utility methods - def rcDebugInfo(rcEvent: ResourceEventModel) = { + protected def rcDebugInfo(rcEvent: ResourceEventModel) = { rcEvent.toDebugString(false) } - //- Utility methods def processResourceEvent(startingUserState: UserState, @@ -164,7 +162,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { walletEntriesBuffer: mutable.Buffer[NewWalletEntry], clogOpt: Option[ContextualLogger] = None): UserState = { - val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id) + val clog = ContextualLogger.fromOther(clogOpt, logger, "processResourceEvent(%s)", currentResourceEvent.id) var _workingUserState = startingUserState @@ -322,13 +320,13 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { } - def doFullMonthlyBilling(userId: String, + def doFullMonthlyBilling(userStateBootstrap: UserStateBootstrappingData, billingMonthInfo: BillingMonthInfo, - currentUserState: UserState, defaultResourcesMap: DSLResourcesMap, calculationReason: UserStateChangeReason = NoSpecificChangeReason, clogOpt: Option[ContextualLogger] = None): UserState = { + val userID = userStateBootstrap.userID val clog = ContextualLogger.fromOther( clogOpt, @@ -339,9 +337,8 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { val clogSome = Some(clog) val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth( - userId, + userStateBootstrap, billingMonthInfo.previousMonth, - currentUserState, defaultResourcesMap, calculationReason.forPreviousBillingMonth, clogSome @@ -364,7 +361,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { // First, find and process the actual resource events from DB val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod( - userId, + userID, billingMonthStartMillis, billingMonthEndMillis) diff --git a/src/main/scala/gr/grnet/aquarium/computation/UserStateWorker.scala b/src/main/scala/gr/grnet/aquarium/computation/UserStateWorker.scala index 5e5ecd1..5c12eb6 100644 --- a/src/main/scala/gr/grnet/aquarium/computation/UserStateWorker.scala +++ b/src/main/scala/gr/grnet/aquarium/computation/UserStateWorker.scala @@ -138,8 +138,10 @@ case class UserStateWorker(userID: String, * * @see [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]] */ - def findAndRemoveGeneratorsOfImplicitEndEvents(newOccuredMillis: Long - ): (List[ResourceEventModel], List[ResourceEventModel]) = { + def findAndRemoveGeneratorsOfImplicitEndEvents( + newOccuredMillis: Long + ): (List[ResourceEventModel], List[ResourceEventModel]) = { + val buffer = mutable.ListBuffer[(ResourceEventModel, ResourceEventModel)]() val checkSet = mutable.Set[ResourceEventModel]() diff --git a/src/main/scala/gr/grnet/aquarium/computation/data/IMStateSnapshot.scala b/src/main/scala/gr/grnet/aquarium/computation/data/IMStateSnapshot.scala index 8b2642e..fbec707 100644 --- a/src/main/scala/gr/grnet/aquarium/computation/data/IMStateSnapshot.scala +++ b/src/main/scala/gr/grnet/aquarium/computation/data/IMStateSnapshot.scala @@ -36,6 +36,8 @@ package gr.grnet.aquarium.computation.data import gr.grnet.aquarium.event.model.im.IMEventModel +import gr.grnet.aquarium.util.shortClassNameOf +import gr.grnet.aquarium.util.date.MutableDateCalc /** * @@ -43,25 +45,42 @@ import gr.grnet.aquarium.event.model.im.IMEventModel */ case class IMStateSnapshot( - /** - * True if the user has ever been activated even once. - */ - hasBeenActivated: Boolean, /** * This is the latest processed IMEvent */ latestIMEvent: IMEventModel, /** + * The earliest activation time, if it exists. + */ + userActivationMillis: Option[Long], + + /** * This is the recorded role history */ roleHistory: RoleHistory) { - def copyWithEvent(imEvent: IMEventModel) = { + /** + * True iff the user has ever been activated even once. + */ + def hasBeenActivated: Boolean = { + userActivationMillis.isDefined + } + + def updateHistoryWithEvent(imEvent: IMEventModel) = { copy( - hasBeenActivated = this.hasBeenActivated || imEvent.isActive, - latestIMEvent = imEvent, - roleHistory = this.roleHistory.copyWithRole(imEvent.role, imEvent.occurredMillis) + userActivationMillis = if(imEvent.isStateActive) Some(imEvent.occurredMillis) else this.userActivationMillis, + latestIMEvent = imEvent, + roleHistory = this.roleHistory.updateWithRole(imEvent.role, imEvent.occurredMillis) + ) + } + + override def toString = { + "%s(\n!! %s\n!! %s\n!! %s)".format( + shortClassNameOf(this), + latestIMEvent, + userActivationMillis.map(new MutableDateCalc(_)), + roleHistory ) } } @@ -69,8 +88,8 @@ case class IMStateSnapshot( object IMStateSnapshot { def initial(imEvent: IMEventModel): IMStateSnapshot = { IMStateSnapshot( - imEvent.isActive, imEvent, + if(imEvent.isStateActive) Some(imEvent.occurredMillis) else None, RoleHistory.initial(imEvent.role, imEvent.occurredMillis)) } } diff --git a/src/main/scala/gr/grnet/aquarium/computation/data/RoleHistory.scala b/src/main/scala/gr/grnet/aquarium/computation/data/RoleHistory.scala index 1b09bee..760d523 100644 --- a/src/main/scala/gr/grnet/aquarium/computation/data/RoleHistory.scala +++ b/src/main/scala/gr/grnet/aquarium/computation/data/RoleHistory.scala @@ -59,13 +59,22 @@ case class RoleHistory( TreeMap(roles.map(role ⇒ (role.timeslot, role)): _*) } - def copyWithRole(role: String, validFrom: Long) = { + def updateWithRole(role: String, validFrom: Long) = { + def fixValidTo(validFrom: Long, validTo: Long): Long = { + if(validTo == validFrom) { + // Since validTo is exclusive, make at least 1ms gap + validFrom + 1 + } else { + validTo + } + } + val newItems = roles match { case Nil ⇒ RoleHistoryItem(role, validFrom) :: Nil case head :: tail ⇒ - if(head.isStrictlyAfter(validFrom)) { + if(head.startsStrictlyAfter(validFrom)) { // must search history items to find where this fits in @tailrec def check(allChecked: ListBuffer[RoleHistoryItem], @@ -74,16 +83,16 @@ case class RoleHistory( toCheck match { case Nil ⇒ - allChecked.append(RoleHistoryItem(role, validFrom, lastCheck.validFrom)) + allChecked.append(RoleHistoryItem(role, validFrom, fixValidTo(validFrom, lastCheck.validFrom))) allChecked.toList case toCheckHead :: toCheckTail ⇒ - if(toCheckHead.isStrictlyAfter(validFrom)) { + if(toCheckHead.startsStrictlyAfter(validFrom)) { allChecked.append(toCheckHead) check(allChecked, toCheckHead, toCheckTail) } else { - allChecked.append(RoleHistoryItem(role, validFrom, lastCheck.validFrom)) + allChecked.append(RoleHistoryItem(role, validFrom, fixValidTo(validFrom, lastCheck.validFrom))) allChecked.toList } } @@ -94,7 +103,7 @@ case class RoleHistory( check(buffer, head, tail) } else { // assume head.validTo goes to infinity, - RoleHistoryItem(role, validFrom) :: head.copyWithValidTo(validFrom) :: tail + RoleHistoryItem(role, validFrom) :: head.copyWithValidTo(fixValidTo(head.validFrom, validFrom)) :: tail } } diff --git a/src/main/scala/gr/grnet/aquarium/computation/data/RoleHistoryItem.scala b/src/main/scala/gr/grnet/aquarium/computation/data/RoleHistoryItem.scala index c20100c..59dca21 100644 --- a/src/main/scala/gr/grnet/aquarium/computation/data/RoleHistoryItem.scala +++ b/src/main/scala/gr/grnet/aquarium/computation/data/RoleHistoryItem.scala @@ -87,7 +87,7 @@ case class RoleHistoryItem( validFrom <= time && time < validTo } - def isStrictlyAfter(time: Long) = { + def startsStrictlyAfter(time: Long) = { validFrom > time } diff --git a/src/main/scala/gr/grnet/aquarium/simulation/ResourceInstanceSim.scala b/src/main/scala/gr/grnet/aquarium/simulation/ResourceInstanceSim.scala index bbad767..1be7e1d 100644 --- a/src/main/scala/gr/grnet/aquarium/simulation/ResourceInstanceSim.scala +++ b/src/main/scala/gr/grnet/aquarium/simulation/ResourceInstanceSim.scala @@ -61,7 +61,7 @@ class ResourceInstanceSim (val resource: ResourceSim, uidGen.nextUID(), occurredMillis, receivedMillis, - owner.userId, + owner.userID, client.clientId, resource.name, instanceId, diff --git a/src/main/scala/gr/grnet/aquarium/simulation/UserSim.scala b/src/main/scala/gr/grnet/aquarium/simulation/UserSim.scala index dd5cd0b..c369714 100644 --- a/src/main/scala/gr/grnet/aquarium/simulation/UserSim.scala +++ b/src/main/scala/gr/grnet/aquarium/simulation/UserSim.scala @@ -44,7 +44,7 @@ import gr.grnet.aquarium.event.model.resource.ResourceEventModel * @author Christos KK Loverdos */ -case class UserSim(userId: String, userCreationDate: Date, aquarium: AquariumSim) { userSelf ⇒ +case class UserSim(userID: String, userCreationDate: Date, aquarium: AquariumSim) { userSelf ⇒ private[this] def resourceEventStore = aquarium.resourceEventStore @@ -54,7 +54,7 @@ case class UserSim(userId: String, userCreationDate: Date, aquarium: AquariumSim } def myResourceEvents: List[ResourceEventModel] = { - resourceEventStore.findResourceEventsByUserId(userId)(None) + resourceEventStore.findResourceEventsByUserId(userID)(None) } def myResourceEventsByReceivedDate: List[ResourceEventModel] = { diff --git a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBIMEvent.scala b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBIMEvent.scala index bd2713c..34dc9c5 100644 --- a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBIMEvent.scala +++ b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBIMEvent.scala @@ -40,6 +40,7 @@ import gr.grnet.aquarium.util._ import com.mongodb.DBObject import com.mongodb.util.JSON import gr.grnet.aquarium.event.model.im.IMEventModel +import gr.grnet.aquarium.util.date.MutableDateCalc /** @@ -66,6 +67,23 @@ case class MongoDBIMEvent( def withDetails(newDetails: Map[String, String], newOccurredMillis: Long) = this.copy(details = newDetails, occurredMillis = newOccurredMillis) + + override def toString = { + "%s(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)".format( + shortClassNameOf(this), + id, + new MutableDateCalc(occurredMillis).toString, + new MutableDateCalc(receivedMillis).toString, + userID, + clientID, + isActive, + role, + eventVersion, + eventType, + details, + _id + ) + } } object MongoDBIMEvent { diff --git a/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala b/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala index 33b6eaf..4ab4e58 100644 --- a/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala +++ b/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala @@ -47,8 +47,8 @@ import org.junit.{Assert, Ignore, Test} import gr.grnet.aquarium.logic.accounting.algorithm.{ExecutableCostPolicyAlgorithm, CostPolicyAlgorithmCompiler} import gr.grnet.aquarium.{AquariumException} import gr.grnet.aquarium.Aquarium.{Instance ⇒ AquariumInstance} -import gr.grnet.aquarium.computation.{UserState, BillingMonthInfo, UserStateComputations} import gr.grnet.aquarium.computation.reason.MonthlyBillingCalculation +import gr.grnet.aquarium.computation.{UserStateBootstrappingData, UserState, BillingMonthInfo, UserStateComputations} /** @@ -244,12 +244,20 @@ aquariumpolicy: val UserCKKL = Aquarium.newUser("CKKL", UserCreationDate) - val InitialUserState = UserState.createInitialUserState( - userID = UserCKKL.userId, - userCreationMillis = UserCreationDate.getTime, - totalCredits = 0.0, +// val InitialUserState = UserState.createInitialUserState( +// userID = UserCKKL.userID, +// userCreationMillis = UserCreationDate.getTime, +// totalCredits = 0.0, +// initialRole = "default", +// initialAgreement = DSLAgreement.DefaultAgreementName +// ) + + val UserStateBootstrap = UserStateBootstrappingData( + userID = UserCKKL.userID, + userCreationMillis = UserCreationDate.getTime(), initialRole = "default", - initialAgreement = DSLAgreement.DefaultAgreementName + initialAgreement = DSLAgreement.DefaultAgreementName, + initialCredits = 0.0 ) // By convention @@ -294,9 +302,8 @@ aquariumpolicy: private[this] def doFullMonthlyBilling(clog: ContextualLogger, billingMonthInfo: BillingMonthInfo) = { Computations.doFullMonthlyBilling( - UserCKKL.userId, + UserStateBootstrap, billingMonthInfo, - InitialUserState, DefaultResourcesMap, MonthlyBillingCalculation(billingMonthInfo), Some(clog) -- 1.7.10.4