WIP Resource event handling
authorChristos KK Loverdos <loverdos@gmail.com>
Wed, 30 May 2012 15:11:28 +0000 (18:11 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Wed, 30 May 2012 15:11:28 +0000 (18:11 +0300)
23 files changed:
src/main/scala/gr/grnet/aquarium/Aquarium.scala
src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceRequest.scala
src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceResponse.scala
src/main/scala/gr/grnet/aquarium/actor/message/GetUserStateRequest.scala
src/main/scala/gr/grnet/aquarium/actor/message/RouterResponseMessage.scala
src/main/scala/gr/grnet/aquarium/actor/message/UserActorRequestMessage.scala
src/main/scala/gr/grnet/aquarium/actor/message/config/InitializeUserState.scala
src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessIMEvent.scala
src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessResourceEvent.scala
src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/computation/UserState.scala
src/main/scala/gr/grnet/aquarium/computation/UserStateBootstrappingData.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala
src/main/scala/gr/grnet/aquarium/computation/UserStateWorker.scala
src/main/scala/gr/grnet/aquarium/computation/data/IMStateSnapshot.scala
src/main/scala/gr/grnet/aquarium/computation/data/RoleHistory.scala
src/main/scala/gr/grnet/aquarium/computation/data/RoleHistoryItem.scala
src/main/scala/gr/grnet/aquarium/simulation/ResourceInstanceSim.scala
src/main/scala/gr/grnet/aquarium/simulation/UserSim.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBIMEvent.scala
src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala

index c5a5f4e..6e96df5 100644 (file)
@@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicBoolean
 import gr.grnet.aquarium.ResourceLocator._
 import gr.grnet.aquarium.computation.UserStateComputations
 import gr.grnet.aquarium.logic.accounting.algorithm.{SimpleCostPolicyAlgorithmCompiler, CostPolicyAlgorithmCompiler}
+import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
+import gr.grnet.aquarium.logic.accounting.Policy
 
 /**
  * This is the Aquarium entry point.
@@ -381,6 +383,21 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable {
   }
 
   def storeProvider = _storeProvider
+
+  def currentResourcesMap: DSLResourcesMap = {
+    // FIXME: Get rid of this singleton stuff
+    Policy.policy.resourcesMap
+  }
+
+  def initialAgreementForRole(role: String, referenceTimeMillis: Long): String = {
+    // FIXME: Where is the mapping?
+    "default"
+  }
+
+  def initialBalanceForRole(role: String, referenceTimeMillis: Long): Double = {
+    // FIXME: Where is the mapping?
+    10000.0
+  }
   
   def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Aquarium = {
     val map = this.props.map
index ce7fe67..9fcf6e1 100644 (file)
@@ -43,4 +43,7 @@ package gr.grnet.aquarium.actor.message
 case class GetUserBalanceRequest(userID: String, timestamp: Long)
 extends ActorMessage
    with RouterRequestMessage
-   with UserActorRequestMessage
+   with UserActorRequestMessage {
+
+  def referenceTimeMillis = timestamp
+}
index 790984e..47cf87d 100644 (file)
@@ -41,6 +41,8 @@ package gr.grnet.aquarium.actor.message
  */
 
 case class GetUserBalanceResponse(
-    userID: String,
-    balance: Either[String, Double])
-extends RouterResponseMessage(balance)
+    balance: Either[String, GetUserBalanceResponseData],
+    override val suggestedHTTPStatus: Int = 200)
+extends RouterResponseMessage(balance, suggestedHTTPStatus)
+
+case class GetUserBalanceResponseData(userID: String, balance: Double)
index 80f9c11..1086df2 100644 (file)
@@ -43,4 +43,7 @@ package gr.grnet.aquarium.actor.message
 case class GetUserStateRequest(userID: String, timestamp: Long)
 extends ActorMessage
   with RouterRequestMessage
-  with UserActorRequestMessage
+  with UserActorRequestMessage {
+
+  def referenceTimeMillis = timestamp
+}
index 6fad97e..424be05 100644 (file)
@@ -43,16 +43,17 @@ import gr.grnet.aquarium.converter.{StdConverters, PrettyJsonTextFormat}
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-abstract class RouterResponseMessage[T](val response: Either[String, T]) extends ActorMessage {
+abstract class RouterResponseMessage[T](
+    val response: Either[String, T],
+    val suggestedHTTPStatus: Int = 200)
+extends ActorMessage {
+
   def isError = response.isRight
 
   def responseToJsonString: String = {
-    response match {
-      case Left(error) ⇒
-        "{}"
-
-      case Right(data) ⇒
-        StdConverters.AllConverters.convertEx[PrettyJsonTextFormat](data).value
-    }
+    response.fold(
+      _    ⇒ "", // No JSON response on error
+      data ⇒ StdConverters.AllConverters.convertEx[PrettyJsonTextFormat](data).value
+    )
   }
 }
index c5eb6fd..1b327d6 100644 (file)
@@ -41,4 +41,6 @@ package gr.grnet.aquarium.actor.message
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-trait UserActorRequestMessage extends ActorMessage
+trait UserActorRequestMessage extends ActorMessage {
+  def referenceTimeMillis: Long
+}
index 2943c0f..ce841c9 100644 (file)
@@ -35,7 +35,9 @@
 
 package gr.grnet.aquarium.actor.message.config
 
-import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, ActorMessage}
+import gr.grnet.aquarium.actor.message.ActorMessage
+import gr.grnet.aquarium.util.shortClassNameOf
+import gr.grnet.aquarium.util.date.MutableDateCalc
 
 
 /**
@@ -44,4 +46,11 @@ import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, ActorMessage}
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-case class InitializeUserState(userID: String) extends ActorMessage with ActorConfigurationMessage
+case class InitializeUserState(userID: String, referenceTimeMillis: Long)
+extends ActorMessage
+   with ActorConfigurationMessage {
+
+  override def toString = {
+    "%s(%s, %s)".format(shortClassNameOf(this), userID, new MutableDateCalc(referenceTimeMillis).toString)
+  }
+}
index 7dc3dc0..4f7eee0 100644 (file)
@@ -47,4 +47,6 @@ import gr.grnet.aquarium.event.model.im.IMEventModel
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-case class ProcessIMEvent(imEvent: IMEventModel) extends ActorMessage with UserActorRequestMessage
+case class ProcessIMEvent(imEvent: IMEventModel) extends ActorMessage with UserActorRequestMessage {
+  def referenceTimeMillis = imEvent.occurredMillis
+}
index fae5cad..63a7447 100644 (file)
@@ -48,4 +48,7 @@ import gr.grnet.aquarium.event.model.resource.ResourceEventModel
  */
 case class ProcessResourceEvent(rcEvent: ResourceEventModel)
 extends ActorMessage
-   with UserActorRequestMessage
+   with UserActorRequestMessage {
+
+  def referenceTimeMillis = rcEvent.occurredMillis
+}
index 8fd252a..e26f4a3 100644 (file)
@@ -143,8 +143,8 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable {
 
   private[this]
   def callRouter(message: RouterRequestMessage, responder: RequestResponder): Unit = {
-    val configurator = Aquarium.Instance
-    val actorProvider = configurator.actorProvider
+    val aquarium = Aquarium.Instance
+    val actorProvider = aquarium.actorProvider
     val router = actorProvider.actorForRole(RouterRole)
     val futureResponse = router ask message
 
@@ -152,7 +152,9 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable {
       future ⇒
         future.value match {
           case None ⇒
-          // TODO: Will this ever happen??
+            // TODO: Will this ever happen??
+            logger.warn("Future did not complete for %s".format(message))
+            responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
 
           case Some(Left(error)) ⇒
             logger.error("Error serving %s: %s".format(message, error))
@@ -162,14 +164,14 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable {
             actualResponse match {
               case routerResponse: RouterResponseMessage[_] ⇒
                 routerResponse.response match {
-                  case Left(error) ⇒
-                    logger.error("Error %s serving %s: Response is: %s".format(error, message, actualResponse))
-                    responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
+                  case Left(errorMessage) ⇒
+                    logger.error("Error '%s' serving %s. Response is: %s".format(errorMessage, message, actualResponse))
+                    responder.complete(stringResponse(routerResponse.suggestedHTTPStatus, errorMessage, "text/plain"))
 
                   case Right(response) ⇒
                     responder.complete(
                       HttpResponse(
-                        status = 200,
+                        routerResponse.suggestedHTTPStatus,
                         body = routerResponse.responseToJsonString.getBytes("UTF-8"),
                         headers = HttpHeader("Content-type", "application/json;charset=utf-8") :: Nil))
                 }
index 1accc85..6d10bab 100644 (file)
@@ -58,28 +58,28 @@ class RouterActor extends ReflectiveRoleableActor {
 
   def role = RouterRole
 
-  private[this] def _launchUserActor(userID: String): ActorRef = {
+  private[this] def _launchUserActor(userID: String, referenceTimeMillis: Long): ActorRef = {
     // create a fresh instance
     val userActor = _actorProvider.actorForRole(UserActorRole)
     UserActorCache.put(userID, userActor)
 
-    userActor ! InitializeUserState(userID)
+    userActor ! InitializeUserState(userID, referenceTimeMillis)
 
     userActor
   }
 
-  private[this] def _findOrCreateUserActor(userID: String): ActorRef = {
+  private[this] def _findOrCreateUserActor(userID: String, referenceTimeMillis: Long): ActorRef = {
     UserActorCache.get(userID) match {
       case Some(userActorRef) ⇒
         userActorRef
 
       case None ⇒
-        _launchUserActor(userID)
+        _launchUserActor(userID, referenceTimeMillis)
     }
   }
 
   private[this] def _forwardToUserActor(userID: String, m: UserActorRequestMessage): Unit = {
-    _findOrCreateUserActor(userID) forward m
+    _findOrCreateUserActor(userID, m.referenceTimeMillis) forward m
   }
 
 
index 12a2c08..23a9413 100644 (file)
@@ -46,8 +46,11 @@ import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEve
 import gr.grnet.aquarium.computation.data.IMStateSnapshot
 import gr.grnet.aquarium.event.model.im.IMEventModel
 import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
-import gr.grnet.aquarium.computation.NewUserState
-import gr.grnet.aquarium.actor.message.{GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
+import gr.grnet.aquarium.actor.message.{GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
+import gr.grnet.aquarium.computation.{BillingMonthInfo, UserStateBootstrappingData, UserState}
+import gr.grnet.aquarium.util.date.TimeHelpers
+import gr.grnet.aquarium.logic.accounting.Policy
+import gr.grnet.aquarium.computation.reason.InitialUserStateSetup
 
 /**
  *
@@ -57,8 +60,7 @@ import gr.grnet.aquarium.actor.message.{GetUserBalanceResponse, GetUserStateRequ
 class UserActor extends ReflectiveRoleableActor {
   private[this] var _userID: String = "<?>"
   private[this] var _imState: IMStateSnapshot = _
-//  private[this] var _userState: UserState = _
-  private[this] var _newUserState: NewUserState = _
+  private[this] var _userState: UserState = _
 
   self.lifeCycle = Temporary
 
@@ -80,13 +82,14 @@ class UserActor extends ReflectiveRoleableActor {
   def role = UserActorRole
 
   private[this] def aquarium: Aquarium = Aquarium.Instance
+  private[this] def userStateComputations = aquarium.userStateComputations
 
-  private[this] def _timestampTheshold =
+  private[this] def _timestampTheshold = {
     aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000)
-
+  }
 
   private[this] def _haveUserState = {
-    this._newUserState ne null
+    this._userState ne null
   }
 
   private[this] def _haveIMState = {
@@ -99,7 +102,8 @@ class UserActor extends ReflectiveRoleableActor {
   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
   }
 
-  private[this] def createIMState(userID: String): Unit = {
+  private[this] def createIMState(event: InitializeUserState): Unit = {
+    val userID = event.userID
     val store = aquarium.imEventStore
     // TODO: Optimization: Since IMState only records roles, we should incrementally
     // TODO:               built it only for those IMEvents that changed the role.
@@ -111,7 +115,7 @@ class UserActor extends ReflectiveRoleableActor {
           IMStateSnapshot.initial(imEvent)
 
         case currentState ⇒
-          currentState.copyWithEvent(imEvent)
+          currentState.updateHistoryWithEvent(imEvent)
       }
 
       this._imState = newState
@@ -120,7 +124,47 @@ class UserActor extends ReflectiveRoleableActor {
     DEBUG("Recomputed %s = %s", shortNameOfClass(classOf[IMStateSnapshot]), this._imState)
   }
 
-  private[this] def createUserState(userID: String): Unit = {
+  /**
+   * Resource events are processed only if the user has been activated.
+   */
+  private[this] def shouldProcessResourceEvents: Boolean = {
+    _haveIMState && this._imState.hasBeenActivated
+  }
+
+  private[this] def createUserState(event: InitializeUserState): Unit = {
+    val userID = event.userID
+    val referenceTime = event.referenceTimeMillis
+
+    if(!_haveIMState) {
+      // Should have been created from `createIMState()`
+      DEBUG("Cannot create user state from %s, since %s = %s", event, shortNameOfClass(classOf[IMStateSnapshot]), this._imState)
+      return
+    }
+
+    if(!this._imState.hasBeenActivated) {
+      // Cannot set the initial state!
+      DEBUG("Cannot create user state from %s, since user is inactive", event)
+      return
+    }
+
+    val userActivationMillis = this._imState.userActivationMillis.get
+    val initialRole = this._imState.roleHistory.firstRole.get.name
+
+    val userStateBootstrap = UserStateBootstrappingData(
+      this._userID,
+      userActivationMillis,
+      initialRole,
+      aquarium.initialAgreementForRole(initialRole, userActivationMillis),
+      aquarium.initialBalanceForRole(initialRole, userActivationMillis)
+    )
+
+    userStateComputations.doFullMonthlyBilling(
+      userStateBootstrap,
+      BillingMonthInfo.fromMillis(TimeHelpers.nowMillis()),
+      aquarium.currentResourcesMap,
+      InitialUserStateSetup,
+      None
+    )
   }
 
   def onInitializeUserState(event: InitializeUserState): Unit = {
@@ -128,13 +172,8 @@ class UserActor extends ReflectiveRoleableActor {
     this._userID = userID
     DEBUG("Got %s", event)
 
-    createIMState(userID)
-    createUserState(userID)
-  }
-
-  private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = {
-    // FIXME: Implement based on the role
-    "default"
+    createIMState(event)
+    createUserState(event)
   }
 
   /**
@@ -154,30 +193,43 @@ class UserActor extends ReflectiveRoleableActor {
       // This happens when the actor is brought to life, then immediately initialized, and then
       // sent the first IM event. But from the initialization procedure, this IM event will have
       // already been loaded from DB!
-      DEBUG("Ignoring first %s after birth", imEvent.toDebugString)
+      INFO("Ignoring first %s after birth", imEvent.toDebugString)
       return
     }
 
-    this._imState = this._imState.copyWithEvent(imEvent)
+    this._imState = this._imState.updateHistoryWithEvent(imEvent)
 
-    DEBUG("Update %s = %s", shortClassNameOf(this._imState), this._imState)
+    INFO("Update %s = %s", shortClassNameOf(this._imState), this._imState)
   }
 
   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
     val rcEvent = event.rcEvent
 
-    if(!_haveIMState) {
+    if(!shouldProcessResourceEvents) {
       // This means the user has not been activated. So, we do not process any resource event
-      INFO("Not processing %s", rcEvent.toJsonString)
+      DEBUG("Not processing %s", rcEvent.toJsonString)
       return
     }
   }
 
 
   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
-    val userId = event.userID
-    // FIXME: Implement
-//    self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount))
+    val userID = event.userID
+
+    if(!_haveIMState) {
+      // No IMEvent has arrived, so this user is virtually unknown
+      self reply GetUserBalanceResponse(Left("User not found"), 404/*Not found*/)
+    }
+    else if(!_haveUserState) {
+      // The user is known but we have no state.
+      // Ridiculous. Should have been created at least during initialization.
+    }
+
+    if(!_haveUserState) {
+      self reply GetUserBalanceResponse(Left("Not found"), 404/*Not found*/)
+    } else {
+      self reply GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits)))
+    }
   }
 
   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
index 2977dfa..e1c4cbb 100644 (file)
@@ -242,6 +242,16 @@ object UserState {
     )
   }
 
+  def createInitialUserState(usb: UserStateBootstrappingData): UserState = {
+    createInitialUserState(
+      usb.userID,
+      usb.userCreationMillis,
+      usb.initialCredits,
+      usb.initialRole,
+      usb.initialAgreement
+    )
+  }
+
   def createInitialUserStateFrom(us: UserState): UserState = {
     createInitialUserState(
       us.userID,
diff --git a/src/main/scala/gr/grnet/aquarium/computation/UserStateBootstrappingData.scala b/src/main/scala/gr/grnet/aquarium/computation/UserStateBootstrappingData.scala
new file mode 100644 (file)
index 0000000..54fc9dc
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.computation
+
+/**
+ * Used to bootstrao the user state.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+case class UserStateBootstrappingData(
+    userID: String,
+    userCreationMillis: Long,
+    initialRole: String,
+    initialAgreement: String,
+    initialCredits: Double
+)
index 3545449..8649d95 100644 (file)
@@ -60,9 +60,8 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
   protected lazy val userStateStore     = storeProvider.userStateStore
   protected lazy val resourceEventStore = storeProvider.resourceEventStore
 
-  def findUserStateAtEndOfBillingMonth(userId: String,
+  def findUserStateAtEndOfBillingMonth(userStateBootstrap: UserStateBootstrappingData,
                                        billingMonthInfo: BillingMonthInfo,
-                                       currentUserState: UserState,
                                        defaultResourcesMap: DSLResourcesMap,
                                        calculationReason: UserStateChangeReason,
                                        clogOpt: Option[ContextualLogger] = None): UserState = {
@@ -75,15 +74,15 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
 
     def doCompute: UserState = {
       doFullMonthlyBilling(
-        userId,
+        userStateBootstrap,
         billingMonthInfo,
-        currentUserState,
         defaultResourcesMap,
         calculationReason,
         Some(clog))
     }
 
-    val userCreationMillis = currentUserState.userCreationMillis
+    val userID = userStateBootstrap.userID
+    val userCreationMillis = userStateBootstrap.userCreationMillis
     val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
     val billingMonthStartMillis = billingMonthInfo.startMillis
     val billingMonthStopMillis = billingMonthInfo.stopMillis
@@ -93,7 +92,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
       clog.debug("User did not exist before %s", userCreationDateCalc)
 
       // NOTE: Reason here will be: InitialUserStateSetup$
-      val initialUserState0 = UserState.createInitialUserStateFrom(currentUserState)
+      val initialUserState0 = UserState.createInitialUserState(userStateBootstrap)
       val initialUserState1 = userStateStore.insertUserState(initialUserState0)
 
       clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
@@ -103,7 +102,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
     } else {
       // Ask DB cache for the latest known user state for this billing period
       val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
-        userId,
+        userID,
         billingMonthInfo.year,
         billingMonthInfo.month)
 
@@ -120,7 +119,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
           // For this reason, we must count the events again.
           val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
           val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
-            userId,
+            userID,
             billingMonthStartMillis,
             billingMonthStopMillis)
 
@@ -150,10 +149,9 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
   }
 
   //+ Utility methods
-  def rcDebugInfo(rcEvent: ResourceEventModel) = {
+  protected def rcDebugInfo(rcEvent: ResourceEventModel) = {
     rcEvent.toDebugString(false)
   }
-
   //- Utility methods
 
   def processResourceEvent(startingUserState: UserState,
@@ -164,7 +162,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
                            walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
                            clogOpt: Option[ContextualLogger] = None): UserState = {
 
-    val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
+    val clog = ContextualLogger.fromOther(clogOpt, logger, "processResourceEvent(%s)", currentResourceEvent.id)
 
     var _workingUserState = startingUserState
 
@@ -322,13 +320,13 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
   }
 
 
-  def doFullMonthlyBilling(userId: String,
+  def doFullMonthlyBilling(userStateBootstrap: UserStateBootstrappingData,
                            billingMonthInfo: BillingMonthInfo,
-                           currentUserState: UserState,
                            defaultResourcesMap: DSLResourcesMap,
                            calculationReason: UserStateChangeReason = NoSpecificChangeReason,
                            clogOpt: Option[ContextualLogger] = None): UserState = {
 
+    val userID = userStateBootstrap.userID
 
     val clog = ContextualLogger.fromOther(
       clogOpt,
@@ -339,9 +337,8 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
     val clogSome = Some(clog)
 
     val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
-      userId,
+      userStateBootstrap,
       billingMonthInfo.previousMonth,
-      currentUserState,
       defaultResourcesMap,
       calculationReason.forPreviousBillingMonth,
       clogSome
@@ -364,7 +361,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable {
 
     // First, find and process the actual resource events from DB
     val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
-      userId,
+      userID,
       billingMonthStartMillis,
       billingMonthEndMillis)
 
index 5e5ecd1..5c12eb6 100644 (file)
@@ -138,8 +138,10 @@ case class UserStateWorker(userID: String,
    *
    * @see [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
    */
-  def findAndRemoveGeneratorsOfImplicitEndEvents(newOccuredMillis: Long
-                                                  ): (List[ResourceEventModel], List[ResourceEventModel]) = {
+  def findAndRemoveGeneratorsOfImplicitEndEvents(
+      newOccuredMillis: Long
+  ): (List[ResourceEventModel], List[ResourceEventModel]) = {
+
     val buffer = mutable.ListBuffer[(ResourceEventModel, ResourceEventModel)]()
     val checkSet = mutable.Set[ResourceEventModel]()
 
index 8b2642e..fbec707 100644 (file)
@@ -36,6 +36,8 @@
 package gr.grnet.aquarium.computation.data
 
 import gr.grnet.aquarium.event.model.im.IMEventModel
+import gr.grnet.aquarium.util.shortClassNameOf
+import gr.grnet.aquarium.util.date.MutableDateCalc
 
 /**
  *
@@ -43,25 +45,42 @@ import gr.grnet.aquarium.event.model.im.IMEventModel
  */
 
 case class IMStateSnapshot(
-                          /**
-                           * True if the user has ever been activated even once.
-                           */
-                           hasBeenActivated: Boolean,
                            /**
                             * This is the latest processed IMEvent
                             */
                            latestIMEvent: IMEventModel,
 
                            /**
+                            * The earliest activation time, if it exists.
+                            */
+                           userActivationMillis: Option[Long],
+
+                           /**
                             * This is the recorded role history
                             */
                            roleHistory: RoleHistory) {
 
-  def copyWithEvent(imEvent: IMEventModel) = {
+  /**
+   * True iff the user has ever been activated even once.
+   */
+  def hasBeenActivated: Boolean = {
+    userActivationMillis.isDefined
+  }
+
+  def updateHistoryWithEvent(imEvent: IMEventModel) = {
     copy(
-      hasBeenActivated = this.hasBeenActivated || imEvent.isActive,
-      latestIMEvent    = imEvent,
-      roleHistory      = this.roleHistory.copyWithRole(imEvent.role, imEvent.occurredMillis)
+      userActivationMillis = if(imEvent.isStateActive) Some(imEvent.occurredMillis) else this.userActivationMillis,
+      latestIMEvent = imEvent,
+      roleHistory   = this.roleHistory.updateWithRole(imEvent.role, imEvent.occurredMillis)
+    )
+  }
+
+  override def toString = {
+    "%s(\n!! %s\n!! %s\n!! %s)".format(
+      shortClassNameOf(this),
+      latestIMEvent,
+      userActivationMillis.map(new MutableDateCalc(_)),
+      roleHistory
     )
   }
 }
@@ -69,8 +88,8 @@ case class IMStateSnapshot(
 object IMStateSnapshot {
   def initial(imEvent: IMEventModel): IMStateSnapshot = {
     IMStateSnapshot(
-      imEvent.isActive,
       imEvent,
+      if(imEvent.isStateActive) Some(imEvent.occurredMillis) else None,
       RoleHistory.initial(imEvent.role, imEvent.occurredMillis))
   }
 }
index 1b09bee..760d523 100644 (file)
@@ -59,13 +59,22 @@ case class RoleHistory(
     TreeMap(roles.map(role ⇒ (role.timeslot, role)): _*)
   }
 
-  def copyWithRole(role: String, validFrom: Long) = {
+  def updateWithRole(role: String, validFrom: Long) = {
+    def fixValidTo(validFrom: Long, validTo: Long): Long = {
+      if(validTo == validFrom) {
+        // Since validTo is exclusive, make at least 1ms gap
+        validFrom + 1
+      } else {
+        validTo
+      }
+    }
+
     val newItems = roles match {
       case Nil ⇒
         RoleHistoryItem(role, validFrom) :: Nil
 
       case head :: tail ⇒
-        if(head.isStrictlyAfter(validFrom)) {
+        if(head.startsStrictlyAfter(validFrom)) {
           // must search history items to find where this fits in
           @tailrec
           def check(allChecked: ListBuffer[RoleHistoryItem],
@@ -74,16 +83,16 @@ case class RoleHistory(
 
             toCheck match {
               case Nil ⇒
-                allChecked.append(RoleHistoryItem(role, validFrom, lastCheck.validFrom))
+                allChecked.append(RoleHistoryItem(role, validFrom, fixValidTo(validFrom, lastCheck.validFrom)))
                 allChecked.toList
 
               case toCheckHead :: toCheckTail ⇒
-                if(toCheckHead.isStrictlyAfter(validFrom)) {
+                if(toCheckHead.startsStrictlyAfter(validFrom)) {
                   allChecked.append(toCheckHead)
 
                   check(allChecked, toCheckHead, toCheckTail)
                 } else {
-                  allChecked.append(RoleHistoryItem(role, validFrom, lastCheck.validFrom))
+                  allChecked.append(RoleHistoryItem(role, validFrom, fixValidTo(validFrom, lastCheck.validFrom)))
                   allChecked.toList
                 }
             }
@@ -94,7 +103,7 @@ case class RoleHistory(
           check(buffer, head, tail)
         } else {
           // assume head.validTo goes to infinity,
-          RoleHistoryItem(role, validFrom) :: head.copyWithValidTo(validFrom) :: tail
+          RoleHistoryItem(role, validFrom) :: head.copyWithValidTo(fixValidTo(head.validFrom, validFrom)) :: tail
         }
     }
 
index c20100c..59dca21 100644 (file)
@@ -87,7 +87,7 @@ case class RoleHistoryItem(
     validFrom <= time && time < validTo
   }
 
-  def isStrictlyAfter(time: Long) = {
+  def startsStrictlyAfter(time: Long) = {
     validFrom > time
   }
 
index bbad767..1be7e1d 100644 (file)
@@ -61,7 +61,7 @@ class ResourceInstanceSim (val resource: ResourceSim,
       uidGen.nextUID(),
       occurredMillis,
       receivedMillis,
-      owner.userId,
+      owner.userID,
       client.clientId,
       resource.name,
       instanceId,
index dd5cd0b..c369714 100644 (file)
@@ -44,7 +44,7 @@ import gr.grnet.aquarium.event.model.resource.ResourceEventModel
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-case class UserSim(userId: String, userCreationDate: Date, aquarium: AquariumSim) { userSelf ⇒
+case class UserSim(userID: String, userCreationDate: Date, aquarium: AquariumSim) { userSelf ⇒
   private[this]
   def resourceEventStore = aquarium.resourceEventStore
 
@@ -54,7 +54,7 @@ case class UserSim(userId: String, userCreationDate: Date, aquarium: AquariumSim
   }
 
   def myResourceEvents: List[ResourceEventModel] = {
-    resourceEventStore.findResourceEventsByUserId(userId)(None)
+    resourceEventStore.findResourceEventsByUserId(userID)(None)
   }
 
   def myResourceEventsByReceivedDate: List[ResourceEventModel] = {
index bd2713c..34dc9c5 100644 (file)
@@ -40,6 +40,7 @@ import gr.grnet.aquarium.util._
 import com.mongodb.DBObject
 import com.mongodb.util.JSON
 import gr.grnet.aquarium.event.model.im.IMEventModel
+import gr.grnet.aquarium.util.date.MutableDateCalc
 
 
 /**
@@ -66,6 +67,23 @@ case class MongoDBIMEvent(
 
   def withDetails(newDetails: Map[String, String], newOccurredMillis: Long) =
     this.copy(details = newDetails, occurredMillis = newOccurredMillis)
+
+  override def toString = {
+    "%s(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)".format(
+      shortClassNameOf(this),
+      id,
+      new MutableDateCalc(occurredMillis).toString,
+      new MutableDateCalc(receivedMillis).toString,
+      userID,
+      clientID,
+      isActive,
+      role,
+      eventVersion,
+      eventType,
+      details,
+      _id
+    )
+  }
 }
 
 object MongoDBIMEvent {
index 33b6eaf..4ab4e58 100644 (file)
@@ -47,8 +47,8 @@ import org.junit.{Assert, Ignore, Test}
 import gr.grnet.aquarium.logic.accounting.algorithm.{ExecutableCostPolicyAlgorithm, CostPolicyAlgorithmCompiler}
 import gr.grnet.aquarium.{AquariumException}
 import gr.grnet.aquarium.Aquarium.{Instance ⇒ AquariumInstance}
-import gr.grnet.aquarium.computation.{UserState, BillingMonthInfo, UserStateComputations}
 import gr.grnet.aquarium.computation.reason.MonthlyBillingCalculation
+import gr.grnet.aquarium.computation.{UserStateBootstrappingData, UserState, BillingMonthInfo, UserStateComputations}
 
 
 /**
@@ -244,12 +244,20 @@ aquariumpolicy:
 
   val UserCKKL  = Aquarium.newUser("CKKL", UserCreationDate)
 
-  val InitialUserState = UserState.createInitialUserState(
-    userID = UserCKKL.userId,
-    userCreationMillis = UserCreationDate.getTime,
-    totalCredits = 0.0,
+//  val InitialUserState = UserState.createInitialUserState(
+//    userID = UserCKKL.userID,
+//    userCreationMillis = UserCreationDate.getTime,
+//    totalCredits = 0.0,
+//    initialRole = "default",
+//    initialAgreement = DSLAgreement.DefaultAgreementName
+//  )
+
+  val UserStateBootstrap = UserStateBootstrappingData(
+    userID = UserCKKL.userID,
+    userCreationMillis = UserCreationDate.getTime(),
     initialRole = "default",
-    initialAgreement = DSLAgreement.DefaultAgreementName
+    initialAgreement = DSLAgreement.DefaultAgreementName,
+    initialCredits = 0.0
   )
 
   // By convention
@@ -294,9 +302,8 @@ aquariumpolicy:
   private[this]
   def doFullMonthlyBilling(clog: ContextualLogger, billingMonthInfo: BillingMonthInfo) = {
     Computations.doFullMonthlyBilling(
-      UserCKKL.userId,
+      UserStateBootstrap,
       billingMonthInfo,
-      InitialUserState,
       DefaultResourcesMap,
       MonthlyBillingCalculation(billingMonthInfo),
       Some(clog)