WIP: ResourceEvent-related refactorings
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 7 May 2012 12:20:37 +0000 (15:20 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 7 May 2012 12:20:37 +0000 (15:20 +0300)
16 files changed:
src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/Accounting.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/CostPolicyAlgorithmCompiler.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/ExecutableCostPolicyAlgorithm.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/SimpleCostPolicyAlgorithmCompiler.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/algorithm/SimpleExecutableCostPolicyAlgorithm.scala
src/main/scala/gr/grnet/aquarium/store/PolicyStore.scala
src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala
src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala
src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala
src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala
src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala
src/main/scala/gr/grnet/aquarium/util/package.scala
src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala

index c5178a2..7677b23 100644 (file)
@@ -45,6 +45,7 @@ import message.config.{AquariumPropertiesLoaded, ActorProviderConfigured}
 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
 import gr.grnet.aquarium.actor.message.admin.PingAllRequest
 import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest}
+import gr.grnet.aquarium.{AquariumException, AquariumInternalError}
 
 /**
  * Business logic router. Incoming messages are routed to appropriate destinations. Replies are routed back
@@ -70,19 +71,46 @@ class RouterActor extends ReflectiveRoleableActor {
     UserActorCache.get(userID) match {
       case Some(userActorRef) ⇒
         userActorRef
+
       case None ⇒
         _launchUserActor(userID)
     }
   }
 
   private[this] def _forwardToUserActor(userID: String, m: UserActorRequestMessage): Unit = {
-    try {
-      _findOrCreateUserActor(userID) forward m
+    _findOrCreateUserActor(userID) forward m
+  }
+
+
+  /**
+   * Handles an exception that occurred while servicing a message.
+   *
+   * @param t
+   * The exception.
+   * @param servicingMessage
+   * The message that was being served while the exception happened.
+   * Note that the message can be `null`, in which case the exception
+   * is an NPE.
+   */
+  override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) = {
+    logChainOfCauses(t)
+
+    def logIgnore(e: Throwable) = {
+      logger.error("Ignoring %s".format(shortClassNameOf(e)), e)
+    }
+
+    t match {
+      case e: Error ⇒
+        throw e
+
+      case e: AquariumInternalError ⇒
+        logIgnore(e)
+
+      case e: AquariumException ⇒
+        logIgnore(e)
 
-    } catch { case t: Throwable ⇒
-      logger.error("While forwarding to user actor for userID = %s".format(userID), t)
-      // FIXME: We have a message that never gets to the user actor.
-      // FIXME: We should probably shut the user actor down.
+      case e: Throwable ⇒
+        logIgnore(e)
     }
   }
 
index b5a22a0..5b73185 100644 (file)
@@ -107,7 +107,9 @@ class UserActor extends ReflectiveRoleableActor {
     val now = TimeHelpers.nowMillis()
 
     val imEvent = event.imEvent
-    val isUpdate = if(_haveIMState) {
+    val hadIMState = _haveIMState
+
+    if(hadIMState) {
       val newOccurredMillis = imEvent.occurredMillis
       val currentOccurredMillis = this._imState.imEvent.occurredMillis
 
@@ -119,14 +121,10 @@ class UserActor extends ReflectiveRoleableActor {
 
         return
       }
-
-      true
-    } else {
-      false
     }
 
     this._imState = IMStateSnapshot(imEvent, now)
-    DEBUG("%s %s", if(isUpdate) "Update" else "Set", shortClassNameOf(this._imState))
+    DEBUG("%s %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState))
   }
 
   def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
@@ -142,6 +140,9 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
+    val rcEvent = event.rcEvent
+
+    logger.info("Got\n{}", rcEvent.toJsonString)
   }
 
 
index 6a10e71..b376bff 100644 (file)
@@ -43,10 +43,10 @@ import java.util.Date
 import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just}
 import gr.grnet.aquarium.util.{ContextualLogger, CryptoUtils, Loggable}
 import gr.grnet.aquarium.store.PolicyStore
-import gr.grnet.aquarium.AquariumException
 import gr.grnet.aquarium.event.{WalletEntry}
 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
 import gr.grnet.aquarium.event.resource.ResourceEventModel
+import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
 
 /**
  * A timeslot together with the algorithm and unit price that apply for this particular timeslot.
@@ -141,7 +141,7 @@ trait Accounting extends DSLUtils with Loggable {
                                 dslResource: DSLResource,
                                 policiesByTimeslot: Map[Timeslot, DSLPolicy],
                                 agreementNamesByTimeslot: Map[Timeslot, String],
-                                clogOpt: Option[ContextualLogger] = None): Maybe[List[Chargeslot]] = Maybe {
+                                clogOpt: Option[ContextualLogger] = None): List[Chargeslot] = {
 
     val clog = ContextualLogger.fromOther(clogOpt, logger, "computeInitialChargeslots()")
 //    clog.begin()
@@ -254,7 +254,7 @@ trait Accounting extends DSLUtils with Loggable {
    * Compute the charge slots generated by a particular resource event.
    *
    */
-  def computeFullChargeslots(previousResourceEventM: Maybe[ResourceEventModel],
+  def computeFullChargeslots(previousResourceEventOpt: Option[ResourceEventModel],
                              currentResourceEvent: ResourceEventModel,
                              oldCredits: Double,
                              oldTotalAmount: Double,
@@ -264,7 +264,7 @@ trait Accounting extends DSLUtils with Loggable {
                              agreementNamesByTimeslot: Map[Timeslot, String],
                              algorithmCompiler: CostPolicyAlgorithmCompiler,
                              policyStore: PolicyStore,
-                             clogOpt: Option[ContextualLogger] = None): Maybe[(Timeslot, List[Chargeslot])] = Maybe {
+                             clogOpt: Option[ContextualLogger] = None): (Timeslot, List[Chargeslot]) = {
 
     val clog = ContextualLogger.fromOther(clogOpt, logger, "computeFullChargeslots()")
 //    clog.begin()
@@ -277,9 +277,9 @@ trait Accounting extends DSLUtils with Loggable {
     val (referenceTimeslot, relevantPolicies, previousValue) = costPolicy.needsPreviousEventForCreditAndAmountCalculation match {
       // We need a previous event
       case true ⇒
-        previousResourceEventM match {
+        previousResourceEventOpt match {
           // We have a previous event
-          case Just(previousResourceEvent) ⇒
+          case Some(previousResourceEvent) ⇒
 //            clog.debug("Have previous event")
 //            clog.debug("previousValue = %s", previousResourceEvent.value)
 
@@ -294,16 +294,10 @@ trait Accounting extends DSLUtils with Loggable {
             (referenceTimeslot, relevantPolicies, previousResourceEvent.value)
 
           // We do not have a previous event
-          case NoVal ⇒
+          case None ⇒
             throw new AquariumException(
               "Unable to charge. No previous event given for %s".
                 format(currentResourceEvent.toDebugString()))
-
-          // We could not obtain a previous event
-          case failed @ Failed(e) ⇒
-            throw new AquariumException(
-              "Unable to charge. Could not obtain previous event for %s".
-                format(currentResourceEvent.toDebugString()), e)
         }
 
       // We do not need a previous event
@@ -318,23 +312,21 @@ trait Accounting extends DSLUtils with Loggable {
 //        clog.debug("referenceTimeslot = %s".format(referenceTimeslot))
 
 //        clog.debug("Calling policyStore.loadValidPolicyEntryAt(%s)", new MutableDateCalc(occurredMillis))
-        val relevantPolicyM = policyStore.loadValidPolicyAt(occurredMillis, dsl)
+        val relevantPolicyOpt = policyStore.loadValidPolicyAt(occurredMillis, dsl)
 //        clog.debug("  ==> relevantPolicyM = %s", relevantPolicyM)
 
-        val relevantPolicies = relevantPolicyM match {
-          case Just(relevantPolicy) ⇒
+        val relevantPolicies = relevantPolicyOpt match {
+          case Some(relevantPolicy) ⇒
             Map(referenceTimeslot -> relevantPolicy)
-          case NoVal ⇒
-            throw new AquariumException("No relevant policy found for %s".format(referenceTimeslot))
-          case failed @ Failed(e) ⇒
-            throw new AquariumException("No relevant policy found for %s".format(referenceTimeslot), e)
 
+          case None ⇒
+            throw new AquariumInternalError("No relevant policy found for %s".format(referenceTimeslot))
         }
 
         (referenceTimeslot, relevantPolicies, previousValue)
     }
 
-    val initialChargeslotsM = computeInitialChargeslots(
+    val initialChargeslots = computeInitialChargeslots(
       referenceTimeslot,
       dslResource,
       relevantPolicies,
@@ -342,60 +334,28 @@ trait Accounting extends DSLUtils with Loggable {
       Some(clog)
     )
 
-    val fullChargeslotsM = initialChargeslotsM.map { chargeslots ⇒
-      chargeslots.map {
-        case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒
-          val execAlgorithmM = algorithmCompiler.compile(algorithmDefinition)
-          execAlgorithmM match {
-            case NoVal ⇒
-              throw new AquariumException("Could not compile algorithm %s".format(algorithmDefinition))
-
-            case failed @ Failed(e) ⇒
-              failed.throwMe
-
-            case Just(execAlgorithm) ⇒
-              val valueMap = costPolicy.makeValueMap(
-                oldCredits,
-                oldTotalAmount,
-                newTotalAmount,
-                stopMillis - startMillis,
-                previousValue,
-                currentResourceEvent.value,
-                unitPrice
-              )
+    val fullChargeslots = initialChargeslots.map {
+      case chargeslot @ Chargeslot(startMillis, stopMillis, algorithmDefinition, unitPrice, _) ⇒
+        val execAlgorithm = algorithmCompiler.compile(algorithmDefinition)
+        val valueMap = costPolicy.makeValueMap(
+          oldCredits,
+          oldTotalAmount,
+          newTotalAmount,
+          stopMillis - startMillis,
+          previousValue,
+          currentResourceEvent.value,
+          unitPrice
+        )
 
 //              clog.debug("execAlgorithm = %s", execAlgorithm)
-              clog.debugMap("valueMap", valueMap, 1)
-
-              // This is it
-              val creditsM = execAlgorithm.apply(valueMap)
-
-              creditsM match {
-                case NoVal ⇒
-                  throw new AquariumException(
-                    "Could not compute credits for resource %s during %s".
-                      format(dslResource.name, Timeslot(new Date(startMillis), new Date(stopMillis))))
-
-                case failed @ Failed(e) ⇒
-                  failed.throwMe
+        clog.debugMap("valueMap", valueMap, 1)
 
-                case Just(credits) ⇒
-                  chargeslot.copy(computedCredits = Some(credits))
-              }
-          }
-      }
-    }
-
-    val result = fullChargeslotsM match {
-      case Just(fullChargeslots) ⇒
-        referenceTimeslot -> fullChargeslots
-      case NoVal ⇒
-        null
-      case failed @ Failed(e) ⇒
-        failed.throwMe
+        // This is it
+        val credits = execAlgorithm.apply(valueMap)
+        chargeslot.copy(computedCredits = Some(credits))
     }
 
-//    clog.end()
+    val result = referenceTimeslot -> fullChargeslots
 
     result
   }
index 3685335..85de5eb 100644 (file)
@@ -51,5 +51,5 @@ trait CostPolicyAlgorithmCompiler {
    * @param definition the textual representation of the algorithm
    * @return the executable form of the algorithm
    */
-  def compile(definition: String): Maybe[ExecutableCostPolicyAlgorithm]
+  def compile(definition: String): ExecutableCostPolicyAlgorithm
 }
index 49dbef2..520e61a 100644 (file)
@@ -44,4 +44,4 @@ import gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicyVar
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
-trait ExecutableCostPolicyAlgorithm extends (Map[DSLCostPolicyVar, Any] => Maybe[Double])
+trait ExecutableCostPolicyAlgorithm extends (Map[DSLCostPolicyVar, Any] ⇒ Double)
index 23e8c9f..1f4448a 100644 (file)
@@ -51,7 +51,7 @@ object SimpleCostPolicyAlgorithmCompiler extends CostPolicyAlgorithmCompiler {
    * @param definition the textual representation of the algorithm
    * @return the executable form of the algorithm
    */
-  def compile(definition: String): Maybe[ExecutableCostPolicyAlgorithm] = {
-    Just(SimpleExecutableCostPolicyAlgorithm)
+  def compile(definition: String): ExecutableCostPolicyAlgorithm = {
+    SimpleExecutableCostPolicyAlgorithm
   }
 }
index 953303a..6b82167 100644 (file)
@@ -50,7 +50,7 @@ object SimpleExecutableCostPolicyAlgorithm extends ExecutableCostPolicyAlgorithm
   @inline private[this]
   def hrs(millis: Double) = millis / 1000 / 60 / 60
 
-  def apply(vars: Map[DSLCostPolicyVar, Any]): Maybe[Double] = Maybe {
+  def apply(vars: Map[DSLCostPolicyVar, Any]): Double = {
     vars.apply(DSLCostPolicyNameVar) match {
       case DSLCostPolicyNames.continuous ⇒
         val unitPrice = vars(DSLUnitPriceVar).asInstanceOf[Double]
index ed80f65..a9a58c4 100644 (file)
@@ -71,18 +71,13 @@ trait PolicyStore {
       yield (timeslot, dsl.parse(policyEntry.policyYAML))
   }
   
-  def loadValidPolicyEntryAt(atMillis: Long): Maybe[PolicyEntry] = Maybe {
+  def loadValidPolicyEntryAt(atMillis: Long): Option[PolicyEntry] = {
     loadPolicyEntriesAfter(0L).find { policyEntry ⇒
       policyEntry.fromToTimeslot.containsTimeInMillis(atMillis)
-    } match {
-      case Some(policyEntry) ⇒
-        policyEntry
-      case None ⇒
-        null // Do not worry, this will be transformed to a NoVal by the Maybe polymorphic constructor
     }
   }
   
-  def loadValidPolicyAt(atMillis: Long, dsl: DSL): Maybe[DSLPolicy] = {
+  def loadValidPolicyAt(atMillis: Long, dsl: DSL): Option[DSLPolicy] = {
     loadValidPolicyEntryAt(atMillis).map(policyEntry ⇒ dsl.parse(policyEntry.policyYAML))
   }
 
index f263dd9..4eb6016 100644 (file)
@@ -75,7 +75,7 @@ trait ResourceEventStore {
   /**
    * Count and return the number of "out of sync" events for a billing month.
    */
-  def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long]
+  def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long
 
   /**
    * Finds all relevant resource events for the billing period.
index f008133..2760a9e 100644 (file)
@@ -53,10 +53,6 @@ trait UserStateStore {
    */
   def insertUserState(userState: UserState): UserState
 
-  def insertUserState2(userState: UserState): Maybe[UserState] = {
-    Maybe { insertUserState(userState) }
-  }
-
   /**
    * Find a state by user ID
    */
index 2300ee3..985a27e 100644 (file)
@@ -273,7 +273,7 @@ class MemStore extends UserStateStore
     }.toList
   }
 
-  def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = Maybe {
+  def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
     _resourceEvents.filter { case ev ⇒
       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
       // months
index 1b46a3b..a8671ec 100644 (file)
@@ -184,11 +184,9 @@ class MongoDBStore(
     MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None)
   }
   
-  def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = {
-    Maybe {
-      // FIXME: Implement
-      0L
-    }
+  def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
+    // FIXME: Implement
+    0L
   }
 
   def findAllRelevantResourceEventsForBillingPeriod(userId: String,
index 55753ab..bd2d5b9 100644 (file)
@@ -292,11 +292,11 @@ case class LatestResourceEventsWorker(latestEventsMap: FullMutableResourceTypeMa
     latestEventsMap((resourceEvent.resource, resourceEvent.instanceID)) = resourceEvent
   }
   
-  def findResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = {
-    findFromMapAsMaybe(latestEventsMap, (resource, instanceId))
+  def findResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
+    latestEventsMap.get((resource, instanceId))
   }
 
-  def findAndRemoveResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = {
+  def findAndRemoveResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
     findAndRemoveFromMap(latestEventsMap, (resource, instanceId))
   }
 
@@ -360,7 +360,7 @@ case class ImplicitlyIssuedResourceEventsWorker(implicitlyIssuedEventsMap: FullM
   def toImmutableSnapshot(snapshotTime: Long) =
     ImplicitlyIssuedResourceEventsSnapshot(toList, snapshotTime)
 
-  def findAndRemoveResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = {
+  def findAndRemoveResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
     findAndRemoveFromMap(implicitlyIssuedEventsMap, (resource, instanceId))
   }
 
index ec34ac6..c015377 100644 (file)
@@ -37,17 +37,15 @@ package gr.grnet.aquarium.user
 
 
 import scala.collection.mutable
-import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
-import gr.grnet.aquarium.util.{ContextualLogger, Loggable, justForSure, failedForSure}
+import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap}
 import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
 import gr.grnet.aquarium.logic.accounting.Accounting
 import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
-import gr.grnet.aquarium.event.{NewWalletEntry}
+import gr.grnet.aquarium.event.NewWalletEntry
 import gr.grnet.aquarium.event.resource.ResourceEventModel
 import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent}
-import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion.User
 import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
 
 /**
@@ -141,7 +139,7 @@ class UserStateComputations extends Loggable {
                                        accounting: Accounting,
                                        algorithmCompiler: CostPolicyAlgorithmCompiler,
                                        calculationReason: UserStateChangeReason,
-                                       clogOpt: Option[ContextualLogger] = None): Maybe[UserState] = {
+                                       clogOpt: Option[ContextualLogger] = None): UserState = {
 
     val clog = ContextualLogger.fromOther(
       clogOpt,
@@ -149,7 +147,7 @@ class UserStateComputations extends Loggable {
       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
     clog.begin()
 
-    def doCompute: Maybe[UserState] = {
+    def doCompute: UserState = {
       doFullMonthlyBilling(
         userId,
         billingMonthInfo,
@@ -176,84 +174,56 @@ class UserStateComputations extends Loggable {
 
       // NOTE: Reason here will be: InitialUserStateSetup$
       val initialUserState0 = createInitialUserStateFrom(currentUserState)
-      val initialUserStateM = userStateStore.insertUserState2(initialUserState0)
+      val initialUserState1 = userStateStore.insertUserState(initialUserState0)
 
-      clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM))
+      clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
       clog.end()
 
-      initialUserStateM
+      initialUserState1
     } else {
       // Ask DB cache for the latest known user state for this billing period
-      val latestUserStateM = Maybe { userStateStore.findLatestUserStateForEndOfBillingMonth(
+      val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
         userId,
         billingMonthInfo.year,
-        billingMonthInfo.month) match {
+        billingMonthInfo.month)
 
-        case Some(latestUserState) ⇒
-          latestUserState
+      latestUserStateOpt match {
         case None ⇒
-          null
-      }}
-
-      latestUserStateM match {
-        case NoVal ⇒
           // Not found, must compute
           clog.debug("No user state found from cache, will have to (re)compute")
           val result = doCompute
           clog.end()
           result
 
-        case failed @ Failed(e) ⇒
-          clog.warn("Failure while quering cache for user state: %s", failed)
-          clog.end()
-          failed
-
-        case Just(latestUserState) ⇒
+        case Some(latestUserState) ⇒
           // Found a "latest" user state but need to see if it is indeed the true and one latest.
           // For this reason, we must count the events again.
          val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
-         val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
+         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
            userId,
            billingMonthStartMillis,
            billingMonthStopMillis)
 
-         actualOOSEventsCounterM match {
-           case NoVal ⇒
-             val errMsg = "No counter computed for out of sync events. Should at least be zero."
-             clog.warn(errMsg)
-             val result = Failed(new AquariumException(errMsg))
+         val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
+         counterDiff match {
+           // ZERO, we are OK!
+           case 0 ⇒
+             // NOTE: Keep the caller's calculation reason
+             latestUserState.copyForChangeReason(calculationReason)
+
+           // We had more, so must recompute
+           case n if n > 0 ⇒
+             clog.debug(
+               "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
+             val result = doCompute
              clog.end()
              result
 
-           case failed @ Failed(_) ⇒
-             clog.warn("Failure while querying for out of sync events: %s", failed)
-             clog.end()
-             failed
-
-           case Just(actualOOSEventsCounter) ⇒
-             val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
-             counterDiff match {
-               // ZERO, we are OK!
-               case 0 ⇒
-                 // NOTE: Keep the caller's calculation reason
-                 Just(latestUserState.copyForChangeReason(calculationReason))
-
-               // We had more, so must recompute
-               case n if n > 0 ⇒
-                 clog.debug(
-                   "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
-                 val result = doCompute
-                 clog.end()
-                 result
-
-               // We had less????
-               case n if n < 0 ⇒
-                 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
-                 clog.warn(errMsg)
-                 val result = Failed(new AquariumException(errMsg))
-                 clog.end()
-                 result
-             }
+           // We had less????
+           case n if n < 0 ⇒
+             val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
+             clog.warn(errMsg)
+             throw new AquariumException(errMsg)
          }
       }
     }
@@ -307,15 +277,15 @@ class UserStateComputations extends Loggable {
           // The resource event is billable
           // Find the previous event.
           // This is (potentially) needed to calculate new credit amount and new resource instance amount
-          val previousResourceEventM = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
-          clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
+          val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
+          clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
 
-          val havePreviousResourceEvent = previousResourceEventM.isJust
+          val havePreviousResourceEvent = previousResourceEventOpt.isDefined
           val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
           if(needPreviousResourceEvent && !havePreviousResourceEvent) {
             // This must be the first resource event of its kind, ever.
             // TODO: We should normally check the DB to verify the claim (?)
-            clog.info("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
+            clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
             userStateWorker.updateIgnored(currentResourceEvent)
           } else {
             val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
@@ -332,8 +302,8 @@ class UserStateComputations extends Loggable {
             val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
 
             //              clog.debug("Computing full chargeslots")
-            val fullChargeslotsM = accounting.computeFullChargeslots(
-              previousResourceEventM,
+            val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
+              previousResourceEventOpt,
               currentResourceEvent,
               oldCredits,
               oldAmount,
@@ -347,56 +317,46 @@ class UserStateComputations extends Loggable {
             )
 
             // We have the chargeslots, let's associate them with the current event
-            fullChargeslotsM match {
-              case Just((referenceTimeslot, fullChargeslots)) ⇒
-                if(fullChargeslots.length == 0) {
-                  // At least one chargeslot is required.
-                  throw new AquariumException("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
-                }
-                clog.debugSeq("fullChargeslots", fullChargeslots, 0)
-
-                // C. Compute new credit amount (based on the charge slots)
-                val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
-                val newCredits = oldCredits - newCreditsDiff
-
-                if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
-                  val newWalletEntry = NewWalletEntry(
-                    userStateWorker.userId,
-                    newCreditsDiff,
-                    oldCredits,
-                    newCredits,
-                    TimeHelpers.nowMillis(),
-                    referenceTimeslot,
-                    billingMonthInfo.year,
-                    billingMonthInfo.month,
-                    if(havePreviousResourceEvent)
-                      List(currentResourceEvent, justForSure(previousResourceEventM).get)
-                    else
-                      List(currentResourceEvent),
-                    fullChargeslots,
-                    dslResource,
-                    currentResourceEvent.isSynthetic
-                  )
-                  clog.debug("New %s", newWalletEntry)
-
-                  walletEntriesBuffer += newWalletEntry
-                } else {
-                  clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
-                }
-
-                _workingUserState = _workingUserState.copy(
-                  creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
-                  stateChangeCounter = _workingUserState.stateChangeCounter + 1,
-                  totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
-                )
-
-              case NoVal ⇒
-                // At least one chargeslot is required.
-                throw new AquariumException("No chargeslots computed")
-
-              case failed@Failed(e) ⇒
-                throw new AquariumException(e, "Error computing chargeslots")
+            if(fullChargeslots.length == 0) {
+              // At least one chargeslot is required.
+              throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
+            }
+            clog.debugSeq("fullChargeslots", fullChargeslots, 0)
+
+            // C. Compute new credit amount (based on the charge slots)
+            val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
+            val newCredits = oldCredits - newCreditsDiff
+
+            if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
+              val newWalletEntry = NewWalletEntry(
+                userStateWorker.userId,
+                newCreditsDiff,
+                oldCredits,
+                newCredits,
+                TimeHelpers.nowMillis(),
+                referenceTimeslot,
+                billingMonthInfo.year,
+                billingMonthInfo.month,
+                if(havePreviousResourceEvent)
+                  List(currentResourceEvent, previousResourceEventOpt.get)
+                else
+                  List(currentResourceEvent),
+                fullChargeslots,
+                dslResource,
+                currentResourceEvent.isSynthetic
+              )
+              clog.debug("New %s", newWalletEntry)
+
+              walletEntriesBuffer += newWalletEntry
+            } else {
+              clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
             }
+
+            _workingUserState = _workingUserState.copy(
+              creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
+              stateChangeCounter = _workingUserState.stateChangeCounter + 1,
+              totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
+            )
           }
         }
 
@@ -430,7 +390,7 @@ class UserStateComputations extends Loggable {
 
     var _workingUserState = startingUserState
 
-    for(currentResourceEvent <- resourceEvents) {
+    for(currentResourceEvent ← resourceEvents) {
 
       _workingUserState = processResourceEvent(
         _workingUserState,
@@ -457,7 +417,7 @@ class UserStateComputations extends Loggable {
                            accounting: Accounting,
                            algorithmCompiler: CostPolicyAlgorithmCompiler,
                            calculationReason: UserStateChangeReason = NoSpecificChangeReason,
-                           clogOpt: Option[ContextualLogger] = None): Maybe[UserState] = Maybe {
+                           clogOpt: Option[ContextualLogger] = None): UserState = {
 
 
     val clog = ContextualLogger.fromOther(
@@ -468,7 +428,7 @@ class UserStateComputations extends Loggable {
 
     val clogSome = Some(clog)
 
-    val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
+    val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
       userId,
       billingMonthInfo.previousMonth,
       storeProvider,
@@ -480,14 +440,7 @@ class UserStateComputations extends Loggable {
       clogSome
     )
 
-    if(previousBillingMonthUserStateM.isNoVal) {
-      throw new AquariumException("Could not calculate initial user state for billing %s".format(billingMonthInfo))
-    }
-    if(previousBillingMonthUserStateM.isFailed) {
-      throw failedForSure(previousBillingMonthUserStateM).exception
-    }
-
-    val startingUserState = justForSure(previousBillingMonthUserStateM).get
+    val startingUserState = previousBillingMonthUserState
 
     val userStateStore = storeProvider.userStateStore
     val resourceEventStore = storeProvider.resourceEventStore
@@ -572,16 +525,9 @@ class UserStateComputations extends Loggable {
     clog.debug("calculationReason = %s", calculationReason)
 
     if(calculationReason.shouldStoreUserState) {
-      val storedUserStateM = userStateStore.insertUserState2(_workingUserState)
-      storedUserStateM match {
-        case Just(storedUserState) ⇒
-          clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState)
-          _workingUserState = storedUserState
-        case NoVal ⇒
-          clog.warn("Could not store %s", _workingUserState)
-        case failed @ Failed(e) ⇒
-          clog.error(e, "Could not store %s", _workingUserState)
-      }
+      val storedUserState = userStateStore.insertUserState(_workingUserState)
+      clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState)
+      _workingUserState = storedUserState
     }
 
     clog.debug("RETURN %s", _workingUserState)
@@ -625,21 +571,19 @@ case class UserStateWorker(userId: String,
    * @param instanceId
    * @return
    */
-  def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = {
+  def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
     // implicitly issued events are checked first
     implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
-      case just @ Just(_) ⇒
-        just
-      case NoVal ⇒
+      case some @ Some(_) ⇒
+        some
+      case None ⇒
         // explicit previous resource events are checked second
         previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
-          case just @ Just(_) ⇒
-            just
-          case noValOrFailed ⇒
-            noValOrFailed
+          case some @ Some(_) ⇒
+            some
+          case _ ⇒
+            None
         }
-      case failed ⇒
-        failed
     }
   }
 
@@ -691,8 +635,8 @@ case class UserStateWorker(userId: String,
     def doItFor(map: ResourceEventModel.FullMutableResourceTypeMap): Unit = {
       val resourceEvents = map.valuesIterator
       for {
-        resourceEvent <- resourceEvents
-        dslResource   <- resourcesMap.findResource(resourceEvent.safeResource)
+        resourceEvent ← resourceEvents
+        dslResource   ← resourcesMap.findResource(resourceEvent.safeResource)
         costPolicy    =  dslResource.costPolicy
       } {
         if(costPolicy.supportsImplicitEvents) {
index cd6b2a4..e4e94b0 100644 (file)
@@ -146,14 +146,8 @@ package object util {
   }
 
   @inline
-  def findAndRemoveFromMap[A, B <: AnyRef](map: scala.collection.mutable.Map[A, B], key: A): Maybe[B] = Maybe {
-    map.get(key) match {
-      case Some(value) ⇒
-        map -= key
-        value
-      case None ⇒
-        null.asInstanceOf[B]
-    }
+  def findAndRemoveFromMap[A, B <: AnyRef](map: scala.collection.mutable.Map[A, B], key: A): Option[B] = {
+    map.remove(key)
   }
 
   // Dear scalac. Optimize this.
index 7335f2f..615424f 100644 (file)
@@ -149,7 +149,7 @@ aquariumpolicy:
     @inline private[this]
     def hrs(millis: Double) = millis / 1000 / 60 / 60
 
-    def apply(vars: Map[DSLCostPolicyVar, Any]): Maybe[Double] = Maybe {
+    def apply(vars: Map[DSLCostPolicyVar, Any]): Double = {
       vars.apply(DSLCostPolicyNameVar) match {
         case DSLCostPolicyNames.continuous ⇒
           val unitPrice = vars(DSLUnitPriceVar).asInstanceOf[Double]
@@ -194,8 +194,8 @@ aquariumpolicy:
   }
 
   val DefaultCompiler  = new CostPolicyAlgorithmCompiler {
-    def compile(definition: String): Maybe[ExecutableCostPolicyAlgorithm] = {
-      Just(DefaultAlgorithm)
+    def compile(definition: String): ExecutableCostPolicyAlgorithm = {
+      DefaultAlgorithm
     }
   }
   //val DefaultAlgorithm = justForSure(DefaultCompiler.compile("")).get // hardcoded since we know exactly what this is
@@ -349,9 +349,8 @@ aquariumpolicy:
 
     showResourceEvents(clog)
 
-    val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan)
-    val userState = justUserState(userStateM)
-    
+    val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan)
+
     showUserState(clog, userState)
 
     expectCredits(clog, credits, userState)
@@ -379,8 +378,7 @@ aquariumpolicy:
 
     showResourceEvents(clog)
 
-    val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan)
-    val userState = justUserState(userStateM)
+    val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan)
 
     showUserState(clog, userState)
 
@@ -410,8 +408,7 @@ aquariumpolicy:
 
     showResourceEvents(clog)
 
-    val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan)
-    val userState = justUserState(userStateM)
+    val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan)
 
     showUserState(clog, userState)
 
@@ -462,8 +459,8 @@ aquariumpolicy:
 
     clog.debugMap("DefaultResourcesMap", DefaultResourcesMap.map, 1)
 
-    val userStateM = doFullMonthlyBilling(clog, BillingMonthInfoJan)
-    val userState = justUserState(userStateM)
+    val userState = doFullMonthlyBilling(clog, BillingMonthInfoJan)
+
     showUserState(clog, userState)
 
     clog.end()