COntinue with billing code and start cleaning up user actor
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 30 Jan 2012 13:21:52 +0000 (15:21 +0200)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 30 Jan 2012 13:21:52 +0000 (15:21 +0200)
src/main/scala/gr/grnet/aquarium/logic/accounting/Accounting.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/dsl/DSLCostPolicy.scala
src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala
src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala
src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala
src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala

index b3d247c..b6fda15 100644 (file)
@@ -49,7 +49,7 @@ import com.ckkloverdos.maybe.{NoVal, Maybe, Failed, Just}
  */
 trait Accounting extends DSLUtils with Loggable {
 
-  def chargeEvent( oldResourceEvent: ResourceEvent,
+  def chargeEvent2( oldResourceEventM: Maybe[ResourceEvent],
                    newResourceEvent: ResourceEvent,
                    dslAgreement: DSLAgreement,
                    lastSnapshotDate: Date,
@@ -64,7 +64,7 @@ trait Accounting extends DSLUtils with Loggable {
 
           val costPolicy = dslResource.costPolicy
           val isDiscrete = costPolicy.isDiscrete
-          val oldValue = oldResourceEvent.value
+          val oldValueM = oldResourceEventM.map(_.value)
           val newValue = newResourceEvent.value
 
           /* This is a safeguard against the special case where the last
@@ -76,7 +76,7 @@ trait Accounting extends DSLUtils with Loggable {
           if (lastSnapshotDate.getTime == resourceEvent.occurredMillis && !isDiscrete) {
             Just(List())
           } else {
-            val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(oldValue, newValue).forNoVal(Just(0.0))
+            val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(oldValueM, newValue).forNoVal(Just(0.0))
             for {
               amount <- creditCalculationValueM
             } yield {
@@ -84,7 +84,7 @@ trait Accounting extends DSLUtils with Loggable {
               // above, since this point won't be reached in case of error.
               val isFinal = dslResource.costPolicy match {
                 case OnOffCostPolicy =>
-                  OnOffPolicyResourceState(oldValue) match {
+                  OnOffPolicyResourceState(oldValueM) match {
                     case OnResourceState => false
                     case OffResourceState => true
                   }
@@ -164,7 +164,7 @@ trait Accounting extends DSLUtils with Loggable {
       }
     }
 
-    val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(currentValue, resourceEvent.value)
+    val creditCalculationValueM = dslResource.costPolicy.getCreditCalculationValue(Just(currentValue), resourceEvent.value)
     val amount = creditCalculationValueM match {
       case failed @ Failed(_, _) ⇒
         return failed
index 0541da7..463a45f 100644 (file)
@@ -35,7 +35,7 @@
 
 package gr.grnet.aquarium.logic.accounting.dsl
 
-import com.ckkloverdos.maybe.{Failed, Just, Maybe}
+import com.ckkloverdos.maybe.{NoVal, Failed, Just, Maybe}
 
 
 /**
@@ -79,12 +79,12 @@ abstract class DSLCostPolicy(val name: String) {
   /**
    * Given the old value and a value from a resource event, compute the new one.
    */
-  def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double): Double
+  def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double]
 
   /**
    * Get the value that will be used in credit calculation in Accounting.chargeEvents
    */
-  def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double]
+  def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double]
 
   /**
    * An event's value by itself should carry enough info to characterize it billable or not.
@@ -133,12 +133,19 @@ case object ContinuousCostPolicy extends DSLCostPolicy(DSLCostPolicyNames.contin
 
   override def resourceEventValueIsDiff = true
 
-  def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double) = {
-    oldValue + newEventValue
+  def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double) = {
+    oldValueM match {
+      case Just(oldValue) ⇒
+        Just(oldValue + newEventValue)
+      case NoVal ⇒
+        Failed(new Exception("NoVal for oldValue instead of Just"))
+      case Failed(e, m) ⇒
+        Failed(new Exception("Failed for oldValue instead of Just", e), m)
+    }
   }
 
-  def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double] = {
-    Just(oldValue)
+  def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double] = {
+    oldValueM
   }
 }
 
@@ -159,27 +166,35 @@ case object OnOffCostPolicy extends DSLCostPolicy(DSLCostPolicyNames.onoff) {
 
   override def resourceEventValueIsAbs = true
 
-  def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double) = {
-    newEventValue
+  def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double) = {
+    Just(newEventValue)
   }
   
-  def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double] = {
+  def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double] = {
     import OnOffCostPolicyValues.{ON, OFF}
 
     def exception(rs: OnOffPolicyResourceState) =
       new Exception("Resource state transition error (%s -> %s)".format(rs, rs))
     def failed(rs: OnOffPolicyResourceState) =
       Failed(exception(rs))
-
-    (oldValue, newEventValue) match {
-      case (ON, ON) ⇒
-        failed(OnResourceState)
-      case (ON, OFF) ⇒
-        Just(OFF)
-      case (OFF, ON) ⇒
-        Just(ON)
-      case (OFF, OFF) ⇒
-        failed(OffResourceState)
+    
+    oldValueM match {
+      case Just(oldValue) ⇒
+        (oldValue, newEventValue) match {
+          case (ON, ON) ⇒
+            failed(OnResourceState)
+          case (ON, OFF) ⇒
+            Just(OFF)
+          case (OFF, ON) ⇒
+            Just(ON)
+          case (OFF, OFF) ⇒
+            failed(OffResourceState)
+        }
+
+      case NoVal ⇒
+        Failed(new Exception("NoVal for oldValue instead of Just"))
+      case Failed(e, m) ⇒
+        Failed(new Exception("Failed for oldValue instead of Just", e), m)
     }
   }
 
@@ -212,11 +227,11 @@ case object DiscreteCostPolicy extends DSLCostPolicy(DSLCostPolicyNames.discrete
 
   override def resourceEventValueIsDiff = true
 
-  def computeNewResourceInstanceValue(oldValue: Double, newEventValue: Double) = {
-    newEventValue
+  def computeNewResourceInstanceValue(oldValueM: Maybe[Double], newEventValue: Double) = {
+    Just(newEventValue)
   }
   
-  def getCreditCalculationValue(oldValue: Double, newEventValue: Double): Maybe[Double] = {
+  def getCreditCalculationValue(oldValueM: Maybe[Double], newEventValue: Double): Maybe[Double] = {
     Just(newEventValue)
   }
 }
index 7650c51..a545a1b 100644 (file)
@@ -38,6 +38,7 @@ package gr.grnet.aquarium.logic.events
 import net.liftweb.json.{JsonAST, Xml}
 import gr.grnet.aquarium.util.json.JsonHelpers
 import gr.grnet.aquarium.logic.accounting.dsl._
+import com.ckkloverdos.maybe.Maybe
 
 /**
  * Event sent to Aquarium by clients for resource accounting.
@@ -114,6 +115,20 @@ case class ResourceEvent(
   }
 
   def copyWithReceivedMillis(millis: Long) = copy(receivedMillis = millis)
+
+  /**
+   * Find the cost policy of the resource named in this resource event.
+   *
+   * We do not expect cost policies for resources to change, because they are supposed
+   * to be one of their constant characteristics. That is why do not issue a time-dependent
+   * query here for the event's current policy.
+   *
+   * Should the need arises to change the cost policy for a resource, this is a good enough
+   * reason to consider creating another type of resource.
+   */
+  def findCostPolicy(defaultPolicy: DSLPolicy): Maybe[DSLCostPolicy] = {
+    defaultPolicy.findResource(this.safeResource).map(_.costPolicy): Maybe[DSLCostPolicy]
+  }
 }
 
 object ResourceEvent {
index 350e719..7c69317 100644 (file)
@@ -38,6 +38,7 @@ package user
 
 import gr.grnet.aquarium.util.json.JsonSupport
 import gr.grnet.aquarium.logic.accounting.Policy
+import com.ckkloverdos.maybe.{Maybe, Just}
 
 /**
  * Snapshot of data that are user-related.
@@ -125,26 +126,30 @@ case class OwnedResourcesSnapshot(data: List[ResourceInstanceSnapshot], snapshot
   def addOrUpdateResourceSnapshot(name: String,
                                   instanceId: String,
                                   newEventValue: Double,
-                                  snapshotTime: Long): (OwnedResourcesSnapshot, Option[ResourceInstanceSnapshot], ResourceInstanceSnapshot) = {
+                                  snapshotTime: Long): (Maybe[OwnedResourcesSnapshot], Option[ResourceInstanceSnapshot], ResourceInstanceSnapshot) = {
 
     val newRCInstance = ResourceInstanceSnapshot(name, instanceId, newEventValue, snapshotTime)
     val oldRCInstanceOpt = this.findResourceSnapshot(name, instanceId)
-    val newData = oldRCInstanceOpt match {
+    val newDataM = oldRCInstanceOpt match {
       case Some(oldRCInstance) ⇒
         // Need to delete the old one and add the new one
         // FIXME: Get rid of this Policy.policy
         val costPolicy = Policy.policy.findResource(name).get.costPolicy
-        val newValue = costPolicy.computeNewResourceInstanceValue(oldRCInstance.value, newRCInstance.value/* =newEventValue */)
+        val newValueM = costPolicy.computeNewResourceInstanceValue(Just(oldRCInstance.value), newRCInstance.value/* =newEventValue */)
+        newValueM.map { newValue ⇒
+          newRCInstance.copy(data = newValue) :: (data.filterNot(_.isSameResource(name, instanceId)))
+        }
 
-        newRCInstance.copy(data = newValue) :: (data.filterNot(_.isSameResource(name, instanceId)))
       case None ⇒
         // Resource not found, so this is the first time and we just add the new snapshot
-        newRCInstance :: data
+        Just(newRCInstance :: data)
     }
 
-    val newOwnedResources = this.copy(data = newData, snapshotTime = snapshotTime)
+    val newOwnedResourcesM = newDataM.map { newData ⇒
+      this.copy(data = newData, snapshotTime = snapshotTime)
+    }
 
-    (newOwnedResources, oldRCInstanceOpt, newRCInstance)
+    (newOwnedResourcesM, oldRCInstanceOpt, newRCInstance)
   }
 }
 
index cfd1d70..2e31644 100644 (file)
@@ -42,7 +42,7 @@ import gr.grnet.aquarium.util.date.DateCalculator
 import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
 import gr.grnet.aquarium.logic.accounting.Accounting
 import gr.grnet.aquarium.logic.events.ResourceEvent
-import gr.grnet.aquarium.logic.accounting.dsl.{DSLPolicy, DSLAgreement}
+import gr.grnet.aquarium.logic.accounting.dsl.{DSLCostPolicy, DSLPolicy, DSLAgreement}
 
 sealed abstract class CalculationType(_name: String) {
   def name = _name
@@ -213,7 +213,7 @@ class UserStateComputations {
     // implied in order to do billing calculations (e.g. the "off" vmtime resource event)
     var workingUserState = newStartUserState
 
-    for(nextRCEvent <- allBillingPeriodRelevantRCEvents) {
+    for(newResourceEvent <- allBillingPeriodRelevantRCEvents) {
       // We need to do these kinds of calculations:
       // 1. Credit state calculations
       // 2. Resource state calculations
@@ -233,24 +233,45 @@ class UserStateComputations {
 
       // We need:
       // A. The previous event
-      def findPreviousRCEventOf(rcEvent: ResourceEvent): Option[ResourceEvent] = {
-        previousRCEventsMap.get(rcEvent.fullResourceInfo)
-      }
-      def updatePreviousRCEventWith(rcEvent: ResourceEvent): Unit = {
-        previousRCEventsMap(rcEvent.fullResourceInfo) = rcEvent
-      }
-      
-      val prevRCEvent = findPreviousRCEventOf(nextRCEvent) match {
-        case Some(prevRCEvent) ⇒
-          prevRCEvent
-        case None ⇒
-          // Must query the DB?????
-      }
 
+      /**
+       * FIXME: implement
+       */
+      def queryForPreviousRCEvent(rcEvent: ResourceEvent): Maybe[ResourceEvent] = {
+        NoVal
+      }
 
-      // B. The current event: [✓][✔][✗][✘]☒ OK
+      def findPreviousRCEventOf(rcEvent: ResourceEvent): Maybe[ResourceEvent] = {
+        rcEvent.findCostPolicy(defaultPolicy) match {
+          case Just(costPolicy) ⇒
+            if(costPolicy.needsPreviousEventForCreditCalculation) {
+              // Get a previous resource only if this is needed by the policy
+              previousRCEventsMap.get(rcEvent.fullResourceInfo) match {
+                case Some(previousRCEvent) ⇒
+                  Just(previousRCEvent)
+                case None ⇒
+                  queryForPreviousRCEvent(rcEvent)
+              }
+            } else {
+              // No need for previous event. Will return NoVal
+              NoVal
+            }
+
+          case NoVal ⇒
+            NoVal
+          case failed@ Failed(_, _) ⇒
+            failed
+        }
+        
+      }
 
-//      accounting.chargeEvent()
+      def updatePreviousRCEventWith(rcEventM: Maybe[ResourceEvent]): Unit = {
+        for(rcEvent <- rcEventM) {
+          previousRCEventsMap(rcEvent.fullResourceInfo) = rcEvent
+        }
+      }
+      
+      val oldResourceEventM = findPreviousRCEventOf(newResourceEvent)
     }
 
 
index 3dc1f42..b9848be 100644 (file)
@@ -38,7 +38,6 @@ package gr.grnet.aquarium.user.actor
 import gr.grnet.aquarium.actor._
 import gr.grnet.aquarium.Configurator
 import gr.grnet.aquarium.processor.actor._
-import com.ckkloverdos.maybe.{Failed, NoVal, Just}
 import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting}
 import gr.grnet.aquarium.user._
 import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent}
@@ -46,6 +45,7 @@ import java.util.Date
 import gr.grnet.aquarium.util.{DateUtils, Loggable}
 import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResource, DSLComplexResource}
 import gr.grnet.aquarium.util.date.TimeHelpers
+import com.ckkloverdos.maybe.{Maybe, Failed, NoVal, Just}
 
 
 /**
@@ -153,7 +153,7 @@ class UserActor extends AquariumActor
     val eventsDB = _configurator.storeProvider.resourceEventStore
     val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from)
     val numResourceEvents = resourceEvents.size
-    _userState = replayResourceEvents(_userState, resourceEvents, from, to)
+//    _userState = replayResourceEvents(_userState, resourceEvents, from, to)
 
     //Rebuild state from wallet entries
     val wallet = _configurator.storeProvider.walletEntryStore
@@ -198,31 +198,6 @@ class UserActor extends AquariumActor
     initState.copy(active = act, roles = rol)
   }
 
-  /**
-   * Replay resource events on the provided user state
-   */
-  def replayResourceEvents(initState: UserState, events: List[ResourceEvent],
-                           from: Long, to: Long): UserState = {
-    var res = initState.ownedResources
-    events
-      .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
-      .foreach {
-        e =>
-          val name = Policy.policy.findResource(e.resource) match {
-            case Some(x) => x.name
-            case None => ""
-          }
-
-          val instanceId = e.instanceId
-          res = res.addOrUpdateResourceSnapshot(name,
-            instanceId, e.value, e.occurredMillis)._1
-    }
-    if (!events.isEmpty) {
-      val snapTime = events.map{e => e.occurredMillis}.max
-      res = res.copy(snapshotTime = snapTime)
-    }
-    initState.copy(ownedResources = res)
-  }
 
   /**
    * Replay wallet entries on the provided user state
@@ -245,77 +220,6 @@ class UserActor extends AquariumActor
   }
 
   /**
-   * Update wallet entries for all unprocessed events
-   */
-  def calcWalletEntries(): Unit = {
-    ensureUserState
-
-    if (_userState.ownedResources.snapshotTime < _userState.credits.snapshotTime) return
-    val eventsDB = _configurator.storeProvider.resourceEventStore
-    val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, _userState.credits.snapshotTime)
-    val policy = Policy.policy
-
-    val walletEntries = resourceEvents.map {
-      ev =>
-        // TODO: Check that agreement exists
-        val agreement = policy.findAgreement(_userState.agreement.data).get
-
-        val resource = policy.findResource(ev.resource) match {
-          case Some(x) => x
-          case None =>
-            val errMsg = "Cannot find resource: %s".format(ev.resource)
-            ERROR(errMsg)
-            throw new AccountingException(errMsg) // FIXME: to throw or not to throw?
-        }
-
-        // get resource instance id *only* for complex resource
-        // otherwise we could have used `resource.findInstanceId(ev.details)`
-        val instid = resource.isComplex match {
-          case true => ev.details.get(resource.asInstanceOf[DSLComplexResource].descriminatorField)
-          case false => None
-        }
-
-        var currentValue: Double = 0.0
-        var currentSnapshotTime = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.instanceId) match {
-          case Some(x) => x.snapshotTime
-          case None => Long.MaxValue //To trigger recalculation
-        }
-
-        if (currentSnapshotTime > ev.occurredMillis) {
-          //Event is older that current state. Rebuild state up to event timestamp
-          val resHistory =
-            ResourceEvent("", 0, 0, _userId, "1", ev.resource, ev.instanceId, ev.eventVersion, 0, ev.details) ::
-            eventsDB.findResourceEventHistory(_userId, ev.resource, instid, ev.occurredMillis)
-          INFO("%d older entries for resource %s, user %s up to %d".format(resHistory.size, ev.resource, _userId, ev.occurredMillis));
-          var res = OwnedResourcesSnapshot(List(), 0)
-          resHistory.foreach {
-            e =>
-              // update resources state
-              res = res.addOrUpdateResourceSnapshot(e.resource, e.instanceId, e.value, e.occurredMillis)._1
-          }
-          currentSnapshotTime = res.findResourceSnapshot(ev.resource, ev.instanceId).get.snapshotTime
-          currentValue = res.findResourceSnapshot(ev.resource, ev.instanceId).get.data
-        } else {
-          currentValue = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.instanceId).get.data
-        }
-
-        val entries = chargeEvent(ev, agreement, currentValue, new Date(currentSnapshotTime),
-          findRelatedEntries(resource, ev.instanceId))
-        INFO("PERF: CHARGE %s %d".format(ev.id, System.currentTimeMillis))
-        entries match {
-          case Just(x) => x
-          case Failed(e, r) => List()
-          case NoVal => List()
-        }
-    }.flatten
-
-    val walletDB = _configurator.storeProvider.walletEntryStore
-    walletEntries.foreach(w => walletDB.storeWalletEntry(w))
-
-    ensureUserState
-  }
-
-  /**
    * Persist current user state
    */
   private[this] def saveUserState(): Unit = {
@@ -343,7 +247,7 @@ class UserActor extends AquariumActor
         ERROR("Received %s but my userId = %s".format(m, this._userId))
       } else {
         //ensureUserState()
-        calcWalletEntries()
+//        calcWalletEntries()
         //processResourceEvent(resourceEvent, true)
       }
 
@@ -357,7 +261,9 @@ class UserActor extends AquariumActor
 
     case m @ RequestUserBalance(userId, timestamp) ⇒
       if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
-        calcWalletEntries()
+      {
+//        calcWalletEntries()
+      }
       self reply UserResponseGetBalance(userId, _userState.credits.data)
 
     case m @ UserRequestGetState(userId, timestamp) ⇒