Preparing the move to master
[aquarium] / src / main / scala / gr / grnet / aquarium / user / UserStateComputations.scala
index b3cc55e..e344c05 100644 (file)
 
 package gr.grnet.aquarium.user
 
-import scala.collection.mutable
 
 import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
-import gr.grnet.aquarium.logic.accounting.Accounting
-import gr.grnet.aquarium.util.date.DateCalculator
-import gr.grnet.aquarium.logic.accounting.dsl.{DSLResourcesMap, DSLCostPolicy, DSLPolicy}
-import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.logic.events.ResourceEvent
 import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, ResourceEventStore}
+import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
+import gr.grnet.aquarium.logic.accounting.Accounting
+import gr.grnet.aquarium.logic.accounting.algorithm.SimpleCostPolicyAlgorithmCompiler
+import gr.grnet.aquarium.logic.events.{NewWalletEntry, ResourceEvent}
+import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
+import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLCostPolicy, DSLResourcesMap, DSLPolicy}
 
 /**
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 class UserStateComputations extends Loggable {
-  def createFirstUserState(userId: String, agreementName: String = "default") = {
+  def createFirstUserState(userId: String,
+                           millis: Long = TimeHelpers.nowMillis,
+                           agreementName: String = DSLAgreement.DefaultAgreementName) = {
     val now = 0L
     UserState(
       userId,
@@ -58,11 +60,13 @@ class UserStateComputations extends Loggable {
       0L,
       false,
       null,
-      Nil, Nil,Nil,
-      0L,
-      ActiveSuspendedSnapshot(false, now),
+      ImplicitlyIssuedResourceEventsSnapshot(List(), now),
+      Nil, Nil,
+      LatestResourceEventsSnapshot(List(), now),
+      0L, 0L,
+      ActiveStateSnapshot(false, now),
       CreditSnapshot(0, now),
-      AgreementSnapshot(Agreement(agreementName, now, -1) :: Nil, now),
+      AgreementSnapshot(Agreement(agreementName, now) :: Nil, now),
       RolesSnapshot(List(), now),
       OwnedResourcesSnapshot(List(), now)
     )
@@ -76,19 +80,20 @@ class UserStateComputations extends Loggable {
         0L,
         false,
         null,
-        Nil, Nil,Nil,
-        0L,
-        ActiveSuspendedSnapshot(false, now),
+        ImplicitlyIssuedResourceEventsSnapshot(List(), now),
+        Nil, Nil,
+        LatestResourceEventsSnapshot(List(), now),
+        0L, 0L,
+        ActiveStateSnapshot(false, now),
         CreditSnapshot(0, now),
-        AgreementSnapshot(Agreement(agreementName, now, - 1) :: Nil, now),
+        AgreementSnapshot(Agreement(agreementName, now) :: Nil, now),
         RolesSnapshot(List(), now),
         OwnedResourcesSnapshot(List(), now)
       )
     }
 
   def findUserStateAtEndOfBillingMonth(userId: String,
-                                       yearOfBillingMonth: Int,
-                                       billingMonth: Int,
+                                       billingMonthInfo: BillingMonthInfo,
                                        userStateStore: UserStateStore,
                                        resourceEventStore: ResourceEventStore,
                                        policyStore: PolicyStore,
@@ -97,26 +102,20 @@ class UserStateComputations extends Loggable {
                                        zeroUserState: UserState, 
                                        defaultPolicy: DSLPolicy,
                                        defaultResourcesMap: DSLResourcesMap,
-                                       accounting: Accounting): Maybe[UserState] = {
+                                       accounting: Accounting,
+                                       contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = {
 
-    def D(fmt: String, args: Any*) = {
-      logger.debug("[%s, %s-%02d] %s".format(userId, yearOfBillingMonth, billingMonth, fmt.format(args:_*)))
-    }
-
-    def E(fmt: String, args: Any*) = {
-      logger.error("[%s, %s-%02d] %s".format(userId, yearOfBillingMonth, billingMonth, fmt.format(args:_*)))
-    }
-
-    def W(fmt: String, args: Any*) = {
-      logger.error("[%s, %s-%02d] %s".format(userId, yearOfBillingMonth, billingMonth, fmt.format(args:_*)))
-    }
+    val clog = ContextualLogger.fromOther(
+      contextualLogger,
+      logger,
+      "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
+    clog.begin()
 
     def doCompute: Maybe[UserState] = {
-      D("Computing full month billing")
+      clog.debug("Computing full month billing")
       doFullMonthlyBilling(
         userId,
-        yearOfBillingMonth,
-        billingMonth,
+        billingMonthInfo,
         userStateStore,
         resourceEventStore,
         policyStore,
@@ -125,48 +124,81 @@ class UserStateComputations extends Loggable {
         zeroUserState,
         defaultPolicy,
         defaultResourcesMap,
-        accounting)
+        accounting,
+        Just(clog))
     }
 
-    val billingMonthStartDateCalc = new DateCalculator(yearOfBillingMonth, billingMonth)
-    val billingMonthStartMillis = billingMonthStartDateCalc.toMillis
-    val billingMonthStopMillis  = billingMonthStartDateCalc.goEndOfThisMonth.toMillis
+    val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
+    val billingMonthStartMillis = billingMonthInfo.startMillis
+    val billingMonthStopMillis  = billingMonthInfo.stopMillis
 
-    if(billingMonthStartMillis > userCreationMillis) {
+    if(billingMonthStopMillis < userCreationMillis) {
       // If the user did not exist for this billing month, piece of cake
-      D("User did not exists before %s. Returning %s", new DateCalculator(userCreationMillis), zeroUserState)
-      Just(zeroUserState)
+      clog.debug("User did not exist before %s", userCreationDateCalc)
+      clog.debug("Returning ZERO state %s".format(zeroUserState))
+      clog.endWith(Just(zeroUserState))
     } else {
-      resourceEventStore.countOutOfSyncEventsForBillingPeriod(userId, billingMonthStartMillis, billingMonthStopMillis) match {
-        case Just(outOfSyncEventCount) ⇒
-          // Have out of sync, so must recompute
-          D("Found %s out of sync events, will have to (re)compute user state", outOfSyncEventCount)
-          doCompute
+      // Ask DB cache for the latest known user state for this billing period
+      val latestUserStateM = userStateStore.findLatestUserStateForEndOfBillingMonth(
+        userId,
+        billingMonthInfo.year,
+        billingMonthInfo.month)
+
+      latestUserStateM match {
         case NoVal ⇒
-          // No out of sync events, ask DB cache
-          userStateStore.findLatestUserStateForEndOfBillingMonth(userId, yearOfBillingMonth, billingMonth) match {
-            case just @ Just(userState) ⇒
-              // Found from cache
-              D("Found from cache: %s", userState)
-              just
-            case NoVal ⇒
-              // otherwise compute
-              D("No user state found from cache, will have to (re)compute")
-              doCompute
-            case failed @ Failed(_, _) ⇒
-              W("Failure while quering cache for user state: %s", failed)
-              failed
-          }
+          // Not found, must compute
+          clog.debug("No user state found from cache, will have to (re)compute")
+          clog.endWith(doCompute)
+          
         case failed @ Failed(_, _) ⇒
-          W("Failure while querying for out of sync events: %s", failed)
-          failed
+          clog.warn("Failure while quering cache for user state: %s", failed)
+          clog.endWith(failed)
+
+        case Just(latestUserState) ⇒
+          // Found a "latest" user state but need to see if it is indeed the true and one latest.
+          // For this reason, we must count the events again.
+         val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
+         val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
+           userId,
+           billingMonthStartMillis,
+           billingMonthStopMillis)
+
+         actualOOSEventsCounterM match {
+           case NoVal ⇒
+             val errMsg = "No counter computed for out of sync events. Should at least be zero."
+             clog.warn(errMsg)
+             clog.endWith(Failed(new Exception(errMsg)))
+
+           case failed @ Failed(_, _) ⇒
+             clog.warn("Failure while querying for out of sync events: %s", failed)
+             clog.endWith(failed)
+
+           case Just(actualOOSEventsCounter) ⇒
+             val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
+             counterDiff match {
+               // ZERO, we are OK!
+               case 0 ⇒
+                 latestUserStateM
+
+               // We had more, so must recompute
+               case n if n > 0 ⇒
+                 clog.debug(
+                   "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
+                 clog.endWith(doCompute)
+
+               // We had less????
+               case n if n < 0 ⇒
+                 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
+                 clog.warn(errMsg)
+                 clog.endWith(Failed(new Exception(errMsg)))
+             }
+         }
       }
     }
   }
 
   def doFullMonthlyBilling(userId: String,
-                           yearOfBillingMonth: Int,
-                           billingMonth: Int,
+                           billingMonthInfo: BillingMonthInfo,
                            userStateStore: UserStateStore,
                            resourceEventStore: ResourceEventStore,
                            policyStore: PolicyStore,
@@ -175,11 +207,22 @@ class UserStateComputations extends Loggable {
                            zeroUserState: UserState,
                            defaultPolicy: DSLPolicy,
                            defaultResourcesMap: DSLResourcesMap,
-                           accounting: Accounting): Maybe[UserState] = Maybe {
+                           accounting: Accounting,
+                           contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = Maybe {
+
+    def rcDebugInfo(rcEvent: ResourceEvent) = {
+      rcEvent.toDebugString(defaultResourcesMap, false)
+    }
+
+    val clog = ContextualLogger.fromOther(
+      contextualLogger,
+      logger,
+      "doFullMonthlyBilling(%s)", billingMonthInfo)
+    clog.begin()
+
     val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
       userId,
-      yearOfBillingMonth,
-      billingMonth,
+      billingMonthInfo.previousMonth,
       userStateStore,
       resourceEventStore,
       policyStore,
@@ -188,69 +231,244 @@ class UserStateComputations extends Loggable {
       zeroUserState,
       defaultPolicy,
       defaultResourcesMap,
-      accounting
+      accounting,
+      Just(clog)
     )
     
     previousBillingMonthUserStateM match {
       case NoVal ⇒
-        NoVal // not really...
-      case failed @ Failed(_, _) ⇒
-        failed
+        null // not really... (must throw an exception here probably...)
+      case failed @ Failed(e, _) ⇒
+        throw e
       case Just(startingUserState) ⇒
         // This is the real deal
 
-        val billingMonthStartDateCalc = new DateCalculator(yearOfBillingMonth, billingMonth)
-        val billingMonthEndDateCalc   = billingMonthStartDateCalc.copy.goEndOfThisMonth
-        val billingMonthStartMillis = billingMonthStartDateCalc.toMillis
-        val billingMonthEndMillis  = billingMonthEndDateCalc.toMillis
+        // This is a collection of all the latest resource events.
+        // We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
+        // ones.
+        // Will be updated on processing the next resource event.
+        val previousResourceEvents = startingUserState.latestResourceEventsSnapshot.toMutableWorker
+        clog.debug("previousResourceEvents = %s", previousResourceEvents)
 
-        // Keep the working (current) user state. This will get updated as we proceed billing within the month
+        val billingMonthStartMillis = billingMonthInfo.startMillis
+        val billingMonthEndMillis = billingMonthInfo.stopMillis
+
+        // Keep the working (current) user state. This will get updated as we proceed with billing for the month
+        // specified in the parameters.
         var _workingUserState = startingUserState
 
+        // Prepare the implicitly terminated resource events from previous billing period
+        val implicitlyTerminatedResourceEvents = _workingUserState.implicitlyTerminatedSnapshot.toMutableWorker
+        if(implicitlyTerminatedResourceEvents.size > 0) {
+          clog.debug("%s implicitlyTerminatedResourceEvents", implicitlyTerminatedResourceEvents.size)
+          clog.withIndent {
+            implicitlyTerminatedResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
+          }
+        }
+
+        // Keep the resource events from this period that were first (and unused) of their kind
+        val ignoredFirstResourceEvents = IgnoredFirstResourceEventsWorker.Empty
+
+        /**
+         * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource
+         * events and b) the explicit previous resource events. If the event is found, it is removed from the
+         * respective source.
+         *
+         * If the event is not found, then this must be for a new resource instance.
+         * (and probably then some `zero` resource event must be implied as the previous one)
+         * 
+         * @param resource
+         * @param instanceId
+         * @return
+         */
+        def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = {
+          // implicitly terminated events are checked first
+          implicitlyTerminatedResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
+            case just @ Just(_) ⇒
+              just
+            case NoVal ⇒
+              // explicit previous resource events are checked second
+              previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
+                case just @ Just(_) ⇒
+                  just
+                case noValOrFailed ⇒
+                  noValOrFailed
+              }
+            case failed ⇒
+              failed
+          }
+        }
+
+        // Find the actual resource events from DB
         val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
           userId,
           billingMonthStartMillis,
           billingMonthEndMillis)
-    }
+        var _eventCounter = 0
 
-    null
-  }
+        clog.debug("resourceEventStore = %s".format(resourceEventStore))
+        if(allResourceEventsForMonth.size > 0) {
+          clog.debug("Found %s resource events, starting processing...", allResourceEventsForMonth.size)
+        } else {
+          clog.debug("Not found any resource events")
+        }
 
+        for {
+          currentResourceEvent <- allResourceEventsForMonth
+        } {
+          _eventCounter = _eventCounter + 1
+          val theResource = currentResourceEvent.safeResource
+          val theInstanceId = currentResourceEvent.safeInstanceId
+          val theValue = currentResourceEvent.value
 
-  /**
-   * Find the previous resource event, if needed by the event's cost policy,
-   * in order to use it for any credit calculations.
-   */
-  def findPreviousRCEventOf(rcEvent: ResourceEvent,
-                            costPolicy: DSLCostPolicy,
-                            previousRCEventsMap: mutable.Map[ResourceEvent.FullResourceType, ResourceEvent]): Maybe[ResourceEvent] = {
-
-    if(costPolicy.needsPreviousEventForCreditCalculation) {
-      // Get a previous resource only if this is needed by the policy
-      previousRCEventsMap.get(rcEvent.fullResourceInfo) match {
-        case Some(previousRCEvent) ⇒
-          Just(previousRCEvent)
-        case None ⇒
-          queryForPreviousRCEvent(rcEvent)
-      }
-    } else {
-      // No need for previous event. Will return NoVal
-      NoVal
-    }
-  }
+          clog.indent()
+          clog.debug("")
+          clog.debug("Processing %s", currentResourceEvent)
+          clog.debug("+========= %s", rcDebugInfo(currentResourceEvent))
 
-  /**
-   * FIXME: implement
-   */
-  def queryForPreviousRCEvent(rcEvent: ResourceEvent): Maybe[ResourceEvent] = {
-    NoVal
-  }
+          clog.indent()
+
+          if(previousResourceEvents.size > 0) {
+            clog.debug("%s previousResourceEvents", previousResourceEvents.size)
+            clog.withIndent {
+              previousResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
+            }
+          }
+          if(implicitlyTerminatedResourceEvents.size > 0) {
+            clog.debug("%s implicitlyTerminatedResourceEvents", implicitlyTerminatedResourceEvents.size)
+            clog.withIndent {
+              implicitlyTerminatedResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
+            }
+          }
+          if(ignoredFirstResourceEvents.size > 0) {
+            clog.debug("%s ignoredFirstResourceEvents", ignoredFirstResourceEvents.size)
+            clog.withIndent {
+              ignoredFirstResourceEvents.foreach(ev ⇒ clog.debug("%s", rcDebugInfo(ev)))
+            }
+          }
+
+          // Ignore the event if it is not billable (but still record it in the "previous" stuff).
+          // But to make this decision, first we need the resource definition (and its cost policy).
+          val resourceDefM = defaultResourcesMap.findResourceM(theResource)
+          resourceDefM match {
+            // We have a resource (and thus a cost policy)
+            case Just(resourceDef) ⇒
+              val costPolicy = resourceDef.costPolicy
+              clog.debug("Cost policy: %s", costPolicy)
+              val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
+              isBillable match {
+                // The resource event is not billable
+                case false ⇒
+                  clog.debug("Ignoring not billable event %s", rcDebugInfo(currentResourceEvent))
+
+                // The resource event is billable
+                case true ⇒
+                  // Find the previous event.
+                  // This is (potentially) needed to calculate new credit amount and new resource instance amount
+                  val previousResourceEventM = findAndRemovePreviousResourceEvent(theResource, theInstanceId)
+                  clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
+
+                  val havePreviousResourceEvent = previousResourceEventM.isJust
+                  val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
+                  if(needPreviousResourceEvent && !havePreviousResourceEvent) {
+                    // This must be the first resource event of its kind, ever.
+                    // TODO: We should normally check the DB to verify the claim (?)
+                    clog.info("Ignoring first event of its kind %s", rcDebugInfo(currentResourceEvent))
+                    ignoredFirstResourceEvents.updateResourceEvent(currentResourceEvent)
+                  } else {
+                    val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
+                    val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
+                    val oldCredits = _workingUserState.creditsSnapshot.creditAmount
+
+                    // A. Compute new resource instance accumulating amount
+                    val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
+                    
+                    clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
 
-  type FullResourceType = ResourceEvent.FullResourceType
-  def updatePreviousRCEventWith(previousRCEventsMap: mutable.Map[FullResourceType, ResourceEvent],
-                                newRCEvent: ResourceEvent): Unit = {
-    previousRCEventsMap(newRCEvent.fullResourceInfo) = newRCEvent
+                    // B. Compute new wallet entries
+                    val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
+
+                    val fullChargeslotsM = accounting.computeFullChargeslots(
+                      previousResourceEventM,
+                      currentResourceEvent,
+                      oldCredits,
+                      oldAmount,
+                      newAmount,
+                      resourceDef,
+                      defaultResourcesMap,
+                      alltimeAgreements,
+                      SimpleCostPolicyAlgorithmCompiler,
+                      policyStore,
+                      Just(clog)
+                    )
+
+                    // We have the chargeslots, let's associate them with the current event
+                    fullChargeslotsM match {
+                      case Just(fullChargeslots) ⇒
+                        if(fullChargeslots.length == 0) {
+                          // At least one chargeslot is required.
+                          throw new Exception("No chargeslots computed")
+                        }
+                        clog.debug("chargeslots:")
+                        clog.withIndent {
+                          for(fullChargeslot <- fullChargeslots) {
+                            clog.debug("%s", fullChargeslot)
+                          }
+                        }
+                        
+                        // C. Compute new credit amount (based on the charge slots)
+                        val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
+                        val newCredits = oldCredits + newCreditsDiff
+                        clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
+
+                        val newWalletEntry = NewWalletEntry(
+                          userId,
+                          newCreditsDiff,
+                          oldCredits,
+                          newCredits,
+                          TimeHelpers.nowMillis,
+                          billingMonthInfo.year,
+                          billingMonthInfo.month,
+                          currentResourceEvent,
+                          previousResourceEventM.toOption,
+                          fullChargeslots,
+                          resourceDef
+                        )
+
+                        clog.debug("New %s", newWalletEntry)
+
+                      case NoVal ⇒
+                        // At least one chargeslot is required.
+                        throw new Exception("No chargeslots computed")
+
+                      case failed @ Failed(e, m) ⇒
+                        throw new Exception(m, e)
+                    }
+                  }
+
+              }
+
+              // After processing, all event, billable or not update the previous state
+              previousResourceEvents.updateResourceEvent(currentResourceEvent)
+
+            // We do not have a resource (and no cost policy)
+            case NoVal ⇒
+              // Now, this is a matter of politics: what do we do if no policy was found?
+              clog.error("No cost policy for %s", rcDebugInfo(currentResourceEvent))
+
+            // Could not retrieve resource (unlikely to happen)
+            case failed @ Failed(e, m) ⇒
+              clog.error("Error obtaining cost policy for %s", rcDebugInfo(currentResourceEvent))
+              clog.error(e, m)
+          }
+
+          clog.unindent()
+          clog.debug("-========= %s", rcDebugInfo(currentResourceEvent))
+          clog.unindent()
+        }
+        
+
+        clog.endWith(_workingUserState)
+    }
   }
 }
-
-object DefaultUserStateComputations extends UserStateComputations
\ No newline at end of file