WIP: New state machine for message processing
[aquarium] / src / main / scala / gr / grnet / aquarium / charging / ContinuousChargingBehavior.scala
index 7241a77..124da45 100644 (file)
 
 package gr.grnet.aquarium.charging
 
-import gr.grnet.aquarium.event.model.resource.ResourceEventModel
-import scala.collection.mutable
-import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
+import gr.grnet.aquarium.charging.state.UserAgreementHistoryModel
+import gr.grnet.aquarium.computation.BillingMonthInfo
+import gr.grnet.aquarium.event.{CreditsModel, DetailsModel}
+import gr.grnet.aquarium.message.MessageConstants
+import gr.grnet.aquarium.message.avro.gen.{UserStateMsg, WalletEntryMsg, ResourcesChargingStateMsg, ResourceTypeMsg, ResourceInstanceChargingStateMsg, ResourceEventMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, AvroHelpers, MessageFactory}
+import gr.grnet.aquarium.util.LogHelpers.Debug
+import gr.grnet.aquarium.{AquariumInternalError, Aquarium}
 
 /**
  * In practice a resource usage will be charged for the total amount of usage
@@ -47,63 +52,195 @@ import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
-final class ContinuousChargingBehavior
-    extends ChargingBehavior(
-      ChargingBehaviorAliases.continuous,
-      Set(ChargingBehaviorNameInput, UnitPriceInput, OldTotalAmountInput, TimeDeltaInput)) {
-
-  protected def computeSelectorPath(
-      chargingData: mutable.Map[String, Any],
-      currentResourceEvent: ResourceEventModel,
-      referenceTimeslot: Timeslot,
-      previousValue: Double,
-      totalCredits: Double,
-      oldAccumulatingAmount: Double,
-      newAccumulatingAmount: Double
-  ): List[String] = {
-    Nil
+final class ContinuousChargingBehavior extends ChargingBehaviorSkeleton(Nil) {
+
+  def computeCreditsToSubtract(
+      resourceInstanceChargingState: ResourceInstanceChargingStateMsg,
+      oldCredits: Double,
+      timeDeltaMillis: Long,
+      unitPrice: Double
+  ): (Double /* credits */, String /* explanation */) = {
+
+    val oldAccumulatingAmount = resourceInstanceChargingState.getOldAccumulatingAmount
+    val credits = HrsOfMillis(timeDeltaMillis) * oldAccumulatingAmount * unitPrice
+    val explanation = "Hours(%s) * MBs(%s) * UnitPrice(%s)".format(
+      HrsOfMillis(timeDeltaMillis),
+      MBsOfBytes(oldAccumulatingAmount),
+      unitPrice
+    )
+
+    (credits, explanation)
   }
 
-  def computeNewAccumulatingAmount(oldAmount: Double, newEventValue: Double, details: Map[String, String]): Double = {
-    // If the total is in the details, get it, or else compute it
-    details.get("total") match {
-      case Some(total) ⇒
-        total.toDouble
+  def computeSelectorPath(
+      chargingBehaviorDetails: DetailsModel.Type,
+      resourceInstanceChargingState: ResourceInstanceChargingStateMsg,
+      currentResourceEvent: ResourceEventMsg,
+      referenceStartMillis: Long,
+      referenceStopMillis: Long,
+      totalCredits: Double
+  ): List[String] = {
+    List(MessageConstants.DefaultSelectorKey)
+  }
 
-      case _ ⇒
-        oldAmount + newEventValue
-    }
+  def initialChargingDetails = {
+    DetailsModel.make
   }
 
-  def getResourceInstanceInitialAmount: Double = {
-    0.0
+  def computeNewAccumulatingAmount(
+      resourceInstanceChargingState: ResourceInstanceChargingStateMsg,
+      eventDetails: DetailsModel.Type
+  ) = {
+
+    val oldAccumulatingAmount = CreditsModel.from(resourceInstanceChargingState.getOldAccumulatingAmount)
+    val currentValue = CreditsModel.from(resourceInstanceChargingState.getCurrentValue)
+
+    CreditsModel.add(oldAccumulatingAmount, currentValue)
   }
 
-  /**
-   * This is called when we have the very first event for a particular resource instance, and we want to know
-   * if it is billable or not.
-   */
-  def isBillableFirstEvent(event: ResourceEventModel) = {
-    true
+  def constructImplicitEndEventFor(resourceEvent: ResourceEventMsg, newOccurredMillis: Long) = {
+    val details = resourceEvent.getDetails
+    val newDetails = DetailsModel.copyOf(details)
+    MessageHelpers.setAquariumSyntheticAndImplicitEnd(newDetails)
+
+    // FIXME: What value ?
+    ResourceEventMsg.newBuilder(resourceEvent).
+      setDetails(newDetails).
+      setOccurredMillis(newOccurredMillis).
+      setReceivedMillis(newOccurredMillis).
+      build()
   }
 
-  def mustGenerateDummyFirstEvent = true
+  def processResourceEvent(
+       aquarium: Aquarium,
+       resourceEvent: ResourceEventMsg,
+       resourceType: ResourceTypeMsg,
+       billingMonthInfo: BillingMonthInfo,
+       resourcesChargingState: ResourcesChargingStateMsg,
+       userAgreementHistoryModel: UserAgreementHistoryModel,
+       userStateMsg: UserStateMsg,
+       walletEntryRecorder: WalletEntryMsg ⇒ Unit
+   ): (Int, CreditsModel.Type) = {
 
-  def supportsImplicitEvents = {
-    true
-  }
+    // 1. Ensure proper initial state per resource and per instance
+    ensureInitializedWorkingState(resourcesChargingState, resourceEvent)
+
+    // 2. Fill in data from the new event
+    val stateOfResourceInstance = resourcesChargingState.getStateOfResourceInstance
+    val resourcesChargingStateDetails = resourcesChargingState.getDetails
+    val instanceID = resourceEvent.getInstanceID
+    val resourceInstanceChargingState = stateOfResourceInstance.get(instanceID)
+
+    fillWorkingResourceInstanceChargingStateFromEvent(resourceInstanceChargingState, resourceEvent)
+
+    val previousEvents = resourceInstanceChargingState.getPreviousEvents
+    val previousEvent = previousEvents.size() match {
+      case 0 ⇒
+        // We do not have the needed previous event, so this must be the first resource event of its kind, ever.
+        // Let's see if we can create a dummy previous event.
+        Debug(logger, "First event of its kind %s", AvroHelpers.jsonStringOfSpecificRecord(resourceEvent))
+
+        val dummyFirstEventValue = "0.0" // TODO ? From configuration
+
+        val millis = userAgreementHistoryModel.agreementByTimeslot.headOption match {
+          case None =>
+            throw new AquariumInternalError("No agreement!!!") // FIXME Better explanation
+          case Some((_,aggr)) =>
+            val millisAgg = aggr.timeslot.from.getTime
+            val millisMon = billingMonthInfo.monthStartMillis
+            if(millisAgg>millisMon) millisAgg else millisMon
+        }
 
-  def mustConstructImplicitEndEventFor(resourceEvent: ResourceEventModel) = {
-    true
+        val dummyFirstEvent = constructDummyFirstEventFor(resourceEvent, millis, dummyFirstEventValue)
+
+        Debug(logger, "Dummy first event %s", AvroHelpers.jsonStringOfSpecificRecord(dummyFirstEvent))
+        dummyFirstEvent
+
+
+      case _ ⇒
+        val previousEvent = previousEvents.get(0) // head is most recent
+        Debug(logger, "I have previous event %s", AvroHelpers.jsonStringOfSpecificRecord(previousEvent))
+        previousEvent
+
+    }
+
+    val retval = computeWalletEntriesForNewEvent(
+      resourceEvent,
+      resourceType,
+      billingMonthInfo,
+      userStateMsg.getTotalCredits,
+      previousEvent.getOccurredMillis,
+      resourceEvent.getOccurredMillis,
+      userAgreementHistoryModel.agreementByTimeslot,
+      resourcesChargingStateDetails,
+      resourceInstanceChargingState,
+      aquarium,
+      walletEntryRecorder
+    )
+
+    // We need just one previous event, so we update it
+    MessageHelpers.setOnePreviousEvent(resourceInstanceChargingState, resourceEvent)
+    assert(resourceInstanceChargingState.getPreviousEvents.size() == 1)
+
+    retval
   }
 
-  def constructImplicitEndEventFor(resourceEvent: ResourceEventModel, newOccurredMillis: Long) = {
-    assert(supportsImplicitEvents && mustConstructImplicitEndEventFor(resourceEvent))
+  def createVirtualEventsForRealtimeComputation(
+      userID: String,
+      resourceTypeName: String,
+      resourceInstanceID: String,
+      eventOccurredMillis: Long,
+      resourceInstanceChargingState: ResourceInstanceChargingStateMsg
+  ): List[ResourceEventMsg] = {
+    // FIXME This is too adhoc...
+    val path = resourceInstanceChargingState.getPreviousEvents.size() match {
+      case 0 ⇒
+        "unknown" // FIXME This should not happen. Throw?
 
-    val details = resourceEvent.details
-    val newDetails = ResourceEventModel.setAquariumSyntheticAndImplicitEnd(details)
+      case _ ⇒
+        val previousEvent = resourceInstanceChargingState.getPreviousEvents.get(0)
+        previousEvent.getDetails.get(MessageConstants.DetailsKeys.path) match {
+          case null ⇒
+            "unknown" // FIXME This should not happen. Throw?
+
+          case path ⇒
+            MessageHelpers.stringOfAnyValueMsg(path)
+        }
+    }
+
+    // FIXME This is too adhoc...
+    val action = resourceInstanceChargingState.getPreviousEvents.size() match {
+      case 0 ⇒
+        "unknown" // FIXME This should not happen. Throw?
+
+      case _ ⇒
+        val previousEvent = resourceInstanceChargingState.getPreviousEvents.get(0)
+        previousEvent.getDetails.get(MessageConstants.DetailsKeys.action) match {
+          case null ⇒
+            "update"
+
+          case action ⇒
+            MessageHelpers.stringOfAnyValueMsg(action)
+        }
+    }
 
-    resourceEvent.withDetails(newDetails, newOccurredMillis)
+    MessageFactory.newResourceEventMsg(
+      MessageHelpers.VirtualEventsIDGen.nextUID(),
+      eventOccurredMillis,
+      eventOccurredMillis,
+      userID,
+      "aquarium",
+      resourceTypeName,
+      resourceInstanceID,
+      "0.0",
+      MessageConstants.EventVersion_1_0,
+      MessageFactory.newDetails(
+        MessageFactory.newBooleanDetail(MessageConstants.DetailsKeys.aquarium_is_synthetic, true),
+        MessageFactory.newBooleanDetail(MessageConstants.DetailsKeys.aquarium_is_realtime_virtual, true),
+        MessageFactory.newStringDetail(MessageConstants.DetailsKeys.path, path),
+        MessageFactory.newStringDetail(MessageConstants.DetailsKeys.action, action)
+      )
+    ) :: Nil
   }
 }