Reorg initialization seq
[aquarium] / src / main / scala / gr / grnet / aquarium / Aquarium.scala
index fe606fd..af255de 100644 (file)
 
 package gr.grnet.aquarium
 
+import com.ckkloverdos.convert.Converters
 import com.ckkloverdos.env.Env
 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
+import com.ckkloverdos.maybe._
 import com.ckkloverdos.props.Props
+import com.ckkloverdos.sys.SysProp
 import connector.rabbitmq.RabbitMQProducer
-import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
-import java.io.File
-import gr.grnet.aquarium.util.{Loggable, Lifecycle}
+import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
+import gr.grnet.aquarium.message.avro.gen.{UserAgreementMsg, FullPriceTableMsg, IMEventMsg, ResourceTypeMsg, PolicyMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, ModelFactory, AvroHelpers}
+import gr.grnet.aquarium.policy.{AdHocFullPriceTableRef, FullPriceTableModel, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, UserAgreementModel, ResourceType}
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
 import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
-import com.ckkloverdos.convert.Converters
+import gr.grnet.aquarium.store.StoreProvider
+import gr.grnet.aquarium.util.date.TimeHelpers
+import gr.grnet.aquarium.util.{Loggable, Lifecycle}
+import java.io.File
 import java.util.concurrent.atomic.AtomicBoolean
 import org.slf4j.{LoggerFactory, Logger}
-import com.ckkloverdos.maybe._
-import com.ckkloverdos.sys.SysProp
-import gr.grnet.aquarium.service.event.AquariumCreatedEvent
-import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
-import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
-import gr.grnet.aquarium.util.date.TimeHelpers
+import java.util.{Map ⇒ JMap}
+import java.util.{HashMap ⇒ JHashMap}
 
 /**
  *
@@ -59,12 +63,16 @@ import gr.grnet.aquarium.util.date.TimeHelpers
  */
 
 final class Aquarium(env: Env) extends Lifecycle with Loggable {
+
   import Aquarium.EnvKeys
 
   @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
 
+  // Caching value for the latest resource mapping
+  @volatile private[this] var _resourceMapping = apply(EnvKeys.defaultPolicyMsg).getResourceMapping
+
   private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
-    apply(EnvKeys.defaultPolicyModel),
+    apply(EnvKeys.defaultPolicyMsg),
     apply(EnvKeys.storeProvider).policyStore
   )
 
@@ -107,7 +115,7 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
     }
   }
 
-  private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_))
+  private[this] lazy val _allServices: Seq[_ <: Lifecycle] = Aquarium.ServiceKeys.map(this.apply(_))
 
   private[this] def startServices(): Unit = {
     for(service ← _allServices) {
@@ -133,7 +141,7 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
     }
     this.eventsStoreFolder.throwMe // on error
 
-    logger.info("default policy = {}", defaultPolicyModel.toJsonString)
+    logger.info("default policy = {}", AvroHelpers.jsonStringOfSpecificRecord(defaultPolicyMsg))
   }
 
   private[this] def addShutdownHooks(): Unit = {
@@ -227,30 +235,70 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
 
   }
 
-  def currentResourceTypesMap: Map[String, ResourceType] = {
-    val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis())
-    if(policyOpt.isEmpty) {
-      throw new AquariumInternalError("Not even the default policy found")
+  /**
+   * @deprecated Use `currentResourceMapping` instead
+   */
+  def resourceMappingAtMillis(millis: Long): JMap[String, ResourceTypeMsg] = {
+    val policyMspOpt = policyStore.loadPolicyAt(millis)
+    if(policyMspOpt.isEmpty) {
+      throw new AquariumInternalError(
+        "Cannot get resource mapping. Not even the default policy found for time %s",
+        TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
+      )
     }
 
-    policyOpt.get.resourceTypesMap
+    val policyMsg = policyMspOpt.get
+    policyMsg.getResourceMapping
   }
 
-  def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = {
-    policyStore.loadValidPolicyAt(referenceTimeMillis) match {
+  /**
+   * Provides the current resource mapping. This value is cached.
+   *
+   * NOTE: The assumption is that the resource mapping is always updated with new keys,
+   *       that is we allow only the addition of new resource types.
+   */
+  def currentResourceMapping = {
+    this._resourceMapping synchronized this._resourceMapping
+  }
+
+  //  def resourceTypesMapAtMillis(millis: Long): Map[String, ResourceType] = {
+//    val policyMspOpt = policyStore.loadPolicyAt(millis)
+//    if(policyMspOpt.isEmpty) {
+//      throw new AquariumInternalError(
+//        "Cannot get resource types map. Not even the default policy found for time %s",
+//        TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
+//      )
+//    }
+//
+//    val policyMsg = policyMspOpt.get
+//    // TODO optimize
+//    ModelFactory.newPolicyModel(policyMsg).resourceTypesMap
+//  }
+//
+//  def currentResourceTypesMap: Map[String, ResourceType] = {
+//    resourceTypesMapAtMillis(TimeHelpers.nowMillis())
+//  }
+
+  def unsafeValidPolicyModelAt(referenceTimeMillis: Long): PolicyModel = {
+    policyStore.loadPolicyAt(referenceTimeMillis) match {
       case None ⇒
         throw new AquariumInternalError(
           "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
         )
 
-      case Some(policy) ⇒
-        policy
+      case Some(policyMsg) ⇒
+        ModelFactory.newPolicyModel(policyMsg)
     }
   }
 
-  def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = {
-    val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
-    policyAtReferenceTime.roleMapping.get(role) match {
+  def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyMsg = {
+    unsafeValidPolicyModelAt(referenceTimeMillis).msg
+  }
+
+  def unsafeFullPriceTableModelForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableModel = {
+    val policyModelAtReferenceTime = unsafeValidPolicyModelAt(referenceTimeMillis)
+
+    policyModelAtReferenceTime.roleMapping.get(role) match {
       case None ⇒
         throw new AquariumInternalError("Unknown price table for role %s at %s".format(
           role,
@@ -262,40 +310,126 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
     }
   }
 
+  def unsafeFullPriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableMsg = {
+    val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
+    policyAtReferenceTime.getRoleMapping.get(role) match {
+      case null ⇒
+        throw new AquariumInternalError("Unknown price table for role %s at %s".format(
+          role,
+          TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
+        ))
+
+      case fullPriceTable ⇒
+        fullPriceTable
+    }
+  }
+
+  def unsafeFullPriceTableModelForAgreement(
+      userAgreementModel: UserAgreementModel,
+      knownPolicyModel: PolicyModel
+  ): FullPriceTableModel = {
+    val policyModel = knownPolicyModel match {
+      case null ⇒
+        unsafeValidPolicyModelAt(userAgreementModel.validFromMillis)
+
+      case policyModel ⇒
+        policyModel
+    }
+
+    userAgreementModel.fullPriceTableRef match {
+      case PolicyDefinedFullPriceTableRef ⇒
+        val role = userAgreementModel.role
+        policyModel.roleMapping.get(role) match {
+          case None ⇒
+            throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
+              role,
+              userAgreementModel.userID,
+              TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreementModel.validFromMillis)
+            )
+
+          case Some(fullPriceTable) ⇒
+            fullPriceTable
+        }
+
+      case AdHocFullPriceTableRef(fullPriceTable) ⇒
+        fullPriceTable
+    }
+  }
+
+  def unsafeFullPriceTableForAgreement(
+      userAgreement: UserAgreementMsg,
+      knownPolicyModel: PolicyModel
+  ): FullPriceTableMsg = {
+
+    val policyModel = knownPolicyModel match {
+      case null ⇒
+        unsafeValidPolicyModelAt(userAgreement.getValidFromMillis)
+
+      case policyModel ⇒
+        policyModel
+    }
+
+    unsafeFullPriceTableForAgreement(userAgreement, policyModel.msg)
+  }
+
+  def unsafeFullPriceTableForAgreement(
+     userAgreement: UserAgreementMsg,
+     knownPolicy: PolicyMsg
+  ): FullPriceTableMsg = {
+    val policy = knownPolicy match {
+      case null ⇒
+        unsafeValidPolicyAt(userAgreement.getValidFromMillis)
+
+      case policy ⇒
+        policy
+    }
+
+    val role = userAgreement.getRole
+    userAgreement.getFullPriceTableRef match {
+      case null ⇒
+        policy.getRoleMapping.get(role) match {
+          case null ⇒
+            throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
+              role,
+              userAgreement.getUserID,
+              TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreement.getValidFromMillis)
+            )
+
+          case fullPriceTable ⇒
+            fullPriceTable
+        }
+
+      case fullPriceTable ⇒
+        fullPriceTable
+    }
+ }
+
   /**
    * Computes the initial user agreement for the given role and reference time. Also,
    * records the ID from a potential related IMEvent.
    *
-   * @param role                The role in the agreement
-   * @param referenceTimeMillis The reference time to consider for the agreement
+   * @param imEvent       The IMEvent that creates the user
    */
-  def initialUserAgreement(
-      role: String,
-      referenceTimeMillis: Long,
-      relatedIMEventID: Option[String]
-  ): UserAgreementModel = {
+  def initialUserAgreement(imEvent: IMEventMsg): UserAgreementModel = {
+    require(MessageHelpers.isUserCreationIMEvent(imEvent))
+
+    val role = imEvent.getRole
+    val referenceTimeMillis = imEvent.getOccurredMillis
 
     // Just checking
-    assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis))
-
-    StdUserAgreement(
-      "<StandardUserAgreement>",
-      relatedIMEventID,
-      0,
-      Long.MaxValue,
-      role,
-      PolicyDefinedFullPriceTableRef
-    )
+    assert(null ne unsafeFullPriceTableModelForRoleAt(role, referenceTimeMillis))
+
+    ModelFactory.newUserAgreementModelFromIMEvent(imEvent)
   }
 
-  def initialUserBalance(role: String, referenceTimeMillis: Long): Double = {
+  def initialUserBalance(role: String, referenceTimeMillis: Long): Real = {
     // FIXME: Where is the mapping?
-    1000.0
+    Real.Zero
   }
 
-  def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = {
+  def chargingBehaviorOf(resourceType: ResourceTypeMsg): ChargingBehavior = {
     // A resource type never changes charging behavior. By definition.
-    val className = resourceType.chargingBehavior
+    val className = resourceType.getChargingBehaviorClass
     _chargingBehaviorMap.get(className) match {
       case Some(chargingBehavior) ⇒
         chargingBehavior
@@ -315,7 +449,7 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
     }
   }
 
-  def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel)
+  def defaultPolicyMsg = apply(EnvKeys.defaultPolicyMsg)
 
   def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
 
@@ -382,7 +516,8 @@ object Aquarium {
     EnvKeys.eventBus,
     EnvKeys.restService,
     EnvKeys.rabbitMQService,
-    EnvKeys.storeWatcherService
+    EnvKeys.storeWatcherService,
+    EnvKeys.rabbitMQProducer
   )
 
   object EnvKeys {
@@ -485,7 +620,7 @@ object Aquarium {
     final val defaultClassLoader: TypedKey[ClassLoader] =
       new AquariumEnvKey[ClassLoader]("default.class.loader")
 
-    final val defaultPolicyModel: TypedKey[PolicyModel] =
-      new AquariumEnvKey[PolicyModel]("default.policy.model")
+    final val defaultPolicyMsg: TypedKey[PolicyMsg] =
+      new AquariumEnvKey[PolicyMsg]("default.policy.msg")
   }
 }