WIP: ResourceEvent-related refactorings
[aquarium] / src / main / scala / gr / grnet / aquarium / user / UserStateComputations.scala
index 24fa20b..c015377 100644 (file)
@@ -37,17 +37,15 @@ package gr.grnet.aquarium.user
 
 
 import scala.collection.mutable
-import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
-import gr.grnet.aquarium.util.{ContextualLogger, Loggable, justForSure, failedForSure}
+import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap}
 import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
 import gr.grnet.aquarium.logic.accounting.Accounting
 import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
-import gr.grnet.aquarium.event.{NewWalletEntry}
+import gr.grnet.aquarium.event.NewWalletEntry
 import gr.grnet.aquarium.event.resource.ResourceEventModel
 import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent}
-import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion.User
 import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
 
 /**
@@ -141,15 +139,15 @@ class UserStateComputations extends Loggable {
                                        accounting: Accounting,
                                        algorithmCompiler: CostPolicyAlgorithmCompiler,
                                        calculationReason: UserStateChangeReason,
-                                       contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = {
+                                       clogOpt: Option[ContextualLogger] = None): UserState = {
 
     val clog = ContextualLogger.fromOther(
-      contextualLogger,
+      clogOpt,
       logger,
       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
     clog.begin()
 
-    def doCompute: Maybe[UserState] = {
+    def doCompute: UserState = {
       doFullMonthlyBilling(
         userId,
         billingMonthInfo,
@@ -159,7 +157,7 @@ class UserStateComputations extends Loggable {
         accounting,
         algorithmCompiler,
         calculationReason,
-        Just(clog))
+        Some(clog))
     }
 
     val userStateStore = storeProvider.userStateStore
@@ -176,84 +174,56 @@ class UserStateComputations extends Loggable {
 
       // NOTE: Reason here will be: InitialUserStateSetup$
       val initialUserState0 = createInitialUserStateFrom(currentUserState)
-      val initialUserStateM = userStateStore.insertUserState2(initialUserState0)
+      val initialUserState1 = userStateStore.insertUserState(initialUserState0)
 
-      clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM))
+      clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
       clog.end()
 
-      initialUserStateM
+      initialUserState1
     } else {
       // Ask DB cache for the latest known user state for this billing period
-      val latestUserStateM = Maybe { userStateStore.findLatestUserStateForEndOfBillingMonth(
+      val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
         userId,
         billingMonthInfo.year,
-        billingMonthInfo.month) match {
+        billingMonthInfo.month)
 
-        case Some(latestUserState) ⇒
-          latestUserState
+      latestUserStateOpt match {
         case None ⇒
-          null
-      }}
-
-      latestUserStateM match {
-        case NoVal ⇒
           // Not found, must compute
           clog.debug("No user state found from cache, will have to (re)compute")
           val result = doCompute
           clog.end()
           result
 
-        case failed @ Failed(e) ⇒
-          clog.warn("Failure while quering cache for user state: %s", failed)
-          clog.end()
-          failed
-
-        case Just(latestUserState) ⇒
+        case Some(latestUserState) ⇒
           // Found a "latest" user state but need to see if it is indeed the true and one latest.
           // For this reason, we must count the events again.
          val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
-         val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
+         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
            userId,
            billingMonthStartMillis,
            billingMonthStopMillis)
 
-         actualOOSEventsCounterM match {
-           case NoVal ⇒
-             val errMsg = "No counter computed for out of sync events. Should at least be zero."
-             clog.warn(errMsg)
-             val result = Failed(new AquariumException(errMsg))
+         val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
+         counterDiff match {
+           // ZERO, we are OK!
+           case 0 ⇒
+             // NOTE: Keep the caller's calculation reason
+             latestUserState.copyForChangeReason(calculationReason)
+
+           // We had more, so must recompute
+           case n if n > 0 ⇒
+             clog.debug(
+               "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
+             val result = doCompute
              clog.end()
              result
 
-           case failed @ Failed(_) ⇒
-             clog.warn("Failure while querying for out of sync events: %s", failed)
-             clog.end()
-             failed
-
-           case Just(actualOOSEventsCounter) ⇒
-             val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
-             counterDiff match {
-               // ZERO, we are OK!
-               case 0 ⇒
-                 // NOTE: Keep the caller's calculation reason
-                 Just(latestUserState.copyForChangeReason(calculationReason))
-
-               // We had more, so must recompute
-               case n if n > 0 ⇒
-                 clog.debug(
-                   "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
-                 val result = doCompute
-                 clog.end()
-                 result
-
-               // We had less????
-               case n if n < 0 ⇒
-                 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
-                 clog.warn(errMsg)
-                 val result = Failed(new AquariumException(errMsg))
-                 clog.end()
-                 result
-             }
+           // We had less????
+           case n if n < 0 ⇒
+             val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
+             clog.warn(errMsg)
+             throw new AquariumException(errMsg)
          }
       }
     }
@@ -273,9 +243,9 @@ class UserStateComputations extends Loggable {
                            billingMonthInfo: BillingMonthInfo,
                            walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
                            algorithmCompiler: CostPolicyAlgorithmCompiler,
-                           clogM: Maybe[ContextualLogger] = NoVal): UserState = {
+                           clogOpt: Option[ContextualLogger] = None): UserState = {
 
-    val clog = ContextualLogger.fromOther(clogM, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
+    val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
 
     var _workingUserState = startingUserState
 
@@ -307,15 +277,15 @@ class UserStateComputations extends Loggable {
           // The resource event is billable
           // Find the previous event.
           // This is (potentially) needed to calculate new credit amount and new resource instance amount
-          val previousResourceEventM = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
-          clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
+          val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
+          clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
 
-          val havePreviousResourceEvent = previousResourceEventM.isJust
+          val havePreviousResourceEvent = previousResourceEventOpt.isDefined
           val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
           if(needPreviousResourceEvent && !havePreviousResourceEvent) {
             // This must be the first resource event of its kind, ever.
             // TODO: We should normally check the DB to verify the claim (?)
-            clog.info("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
+            clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
             userStateWorker.updateIgnored(currentResourceEvent)
           } else {
             val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
@@ -332,8 +302,8 @@ class UserStateComputations extends Loggable {
             val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
 
             //              clog.debug("Computing full chargeslots")
-            val fullChargeslotsM = accounting.computeFullChargeslots(
-              previousResourceEventM,
+            val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
+              previousResourceEventOpt,
               currentResourceEvent,
               oldCredits,
               oldAmount,
@@ -343,60 +313,50 @@ class UserStateComputations extends Loggable {
               alltimeAgreements,
               algorithmCompiler,
               policyStore,
-              Just(clog)
+              Some(clog)
             )
 
             // We have the chargeslots, let's associate them with the current event
-            fullChargeslotsM match {
-              case Just((referenceTimeslot, fullChargeslots)) ⇒
-                if(fullChargeslots.length == 0) {
-                  // At least one chargeslot is required.
-                  throw new AquariumException("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
-                }
-                clog.debugSeq("fullChargeslots", fullChargeslots, 0)
-
-                // C. Compute new credit amount (based on the charge slots)
-                val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
-                val newCredits = oldCredits - newCreditsDiff
-
-                if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
-                  val newWalletEntry = NewWalletEntry(
-                    userStateWorker.userId,
-                    newCreditsDiff,
-                    oldCredits,
-                    newCredits,
-                    TimeHelpers.nowMillis(),
-                    referenceTimeslot,
-                    billingMonthInfo.year,
-                    billingMonthInfo.month,
-                    if(havePreviousResourceEvent)
-                      List(currentResourceEvent, justForSure(previousResourceEventM).get)
-                    else
-                      List(currentResourceEvent),
-                    fullChargeslots,
-                    dslResource,
-                    currentResourceEvent.isSynthetic
-                  )
-                  clog.debug("New %s", newWalletEntry)
-
-                  walletEntriesBuffer += newWalletEntry
-                } else {
-                  clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
-                }
-
-                _workingUserState = _workingUserState.copy(
-                  creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
-                  stateChangeCounter = _workingUserState.stateChangeCounter + 1,
-                  totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
-                )
-
-              case NoVal ⇒
-                // At least one chargeslot is required.
-                throw new AquariumException("No chargeslots computed")
-
-              case failed@Failed(e) ⇒
-                throw new AquariumException(e, "Error computing chargeslots")
+            if(fullChargeslots.length == 0) {
+              // At least one chargeslot is required.
+              throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
             }
+            clog.debugSeq("fullChargeslots", fullChargeslots, 0)
+
+            // C. Compute new credit amount (based on the charge slots)
+            val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
+            val newCredits = oldCredits - newCreditsDiff
+
+            if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
+              val newWalletEntry = NewWalletEntry(
+                userStateWorker.userId,
+                newCreditsDiff,
+                oldCredits,
+                newCredits,
+                TimeHelpers.nowMillis(),
+                referenceTimeslot,
+                billingMonthInfo.year,
+                billingMonthInfo.month,
+                if(havePreviousResourceEvent)
+                  List(currentResourceEvent, previousResourceEventOpt.get)
+                else
+                  List(currentResourceEvent),
+                fullChargeslots,
+                dslResource,
+                currentResourceEvent.isSynthetic
+              )
+              clog.debug("New %s", newWalletEntry)
+
+              walletEntriesBuffer += newWalletEntry
+            } else {
+              clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
+            }
+
+            _workingUserState = _workingUserState.copy(
+              creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
+              stateChangeCounter = _workingUserState.stateChangeCounter + 1,
+              totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
+            )
           }
         }
 
@@ -426,11 +386,11 @@ class UserStateComputations extends Loggable {
                             billingMonthInfo: BillingMonthInfo,
                             walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
                             algorithmCompiler: CostPolicyAlgorithmCompiler,
-                            clogM: Maybe[ContextualLogger] = NoVal): UserState = {
+                            clogOpt: Option[ContextualLogger] = None): UserState = {
 
     var _workingUserState = startingUserState
 
-    for(currentResourceEvent <- resourceEvents) {
+    for(currentResourceEvent ← resourceEvents) {
 
       _workingUserState = processResourceEvent(
         _workingUserState,
@@ -441,7 +401,7 @@ class UserStateComputations extends Loggable {
         billingMonthInfo,
         walletEntriesBuffer,
         algorithmCompiler,
-        clogM
+        clogOpt
       )
     }
 
@@ -457,18 +417,18 @@ class UserStateComputations extends Loggable {
                            accounting: Accounting,
                            algorithmCompiler: CostPolicyAlgorithmCompiler,
                            calculationReason: UserStateChangeReason = NoSpecificChangeReason,
-                           contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = Maybe {
+                           clogOpt: Option[ContextualLogger] = None): UserState = {
 
 
     val clog = ContextualLogger.fromOther(
-      contextualLogger,
+      clogOpt,
       logger,
       "doFullMonthlyBilling(%s)", billingMonthInfo)
     clog.begin()
 
-    val clogJ = Just(clog)
+    val clogSome = Some(clog)
 
-    val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
+    val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
       userId,
       billingMonthInfo.previousMonth,
       storeProvider,
@@ -477,17 +437,10 @@ class UserStateComputations extends Loggable {
       accounting,
       algorithmCompiler,
       calculationReason.forPreviousBillingMonth,
-      clogJ
+      clogSome
     )
 
-    if(previousBillingMonthUserStateM.isNoVal) {
-      throw new AquariumException("Could not calculate initial user state for billing %s".format(billingMonthInfo))
-    }
-    if(previousBillingMonthUserStateM.isFailed) {
-      throw failedForSure(previousBillingMonthUserStateM).exception
-    }
-
-    val startingUserState = justForSure(previousBillingMonthUserStateM).get
+    val startingUserState = previousBillingMonthUserState
 
     val userStateStore = storeProvider.userStateStore
     val resourceEventStore = storeProvider.resourceEventStore
@@ -522,7 +475,7 @@ class UserStateComputations extends Loggable {
       billingMonthInfo,
       newWalletEntries,
       algorithmCompiler,
-      clogJ
+      clogSome
     )
 
     // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
@@ -556,7 +509,7 @@ class UserStateComputations extends Loggable {
       billingMonthInfo,
       newWalletEntries,
       algorithmCompiler,
-      clogJ
+      clogSome
     )
 
     val lastUpdateTime = TimeHelpers.nowMillis()
@@ -572,16 +525,9 @@ class UserStateComputations extends Loggable {
     clog.debug("calculationReason = %s", calculationReason)
 
     if(calculationReason.shouldStoreUserState) {
-      val storedUserStateM = userStateStore.insertUserState2(_workingUserState)
-      storedUserStateM match {
-        case Just(storedUserState) ⇒
-          clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState)
-          _workingUserState = storedUserState
-        case NoVal ⇒
-          clog.warn("Could not store %s", _workingUserState)
-        case failed @ Failed(e) ⇒
-          clog.error(e, "Could not store %s", _workingUserState)
-      }
+      val storedUserState = userStateStore.insertUserState(_workingUserState)
+      clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState)
+      _workingUserState = storedUserState
     }
 
     clog.debug("RETURN %s", _workingUserState)
@@ -625,21 +571,19 @@ case class UserStateWorker(userId: String,
    * @param instanceId
    * @return
    */
-  def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEventModel] = {
+  def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
     // implicitly issued events are checked first
     implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
-      case just @ Just(_) ⇒
-        just
-      case NoVal ⇒
+      case some @ Some(_) ⇒
+        some
+      case None ⇒
         // explicit previous resource events are checked second
         previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
-          case just @ Just(_) ⇒
-            just
-          case noValOrFailed ⇒
-            noValOrFailed
+          case some @ Some(_) ⇒
+            some
+          case _ ⇒
+            None
         }
-      case failed ⇒
-        failed
     }
   }
 
@@ -691,8 +635,8 @@ case class UserStateWorker(userId: String,
     def doItFor(map: ResourceEventModel.FullMutableResourceTypeMap): Unit = {
       val resourceEvents = map.valuesIterator
       for {
-        resourceEvent <- resourceEvents
-        dslResource   <- resourcesMap.findResource(resourceEvent.safeResource)
+        resourceEvent ← resourceEvents
+        dslResource   ← resourcesMap.findResource(resourceEvent.safeResource)
         costPolicy    =  dslResource.costPolicy
       } {
         if(costPolicy.supportsImplicitEvents) {