Implement Once behavior with the new scheme. Refactor in the process
[aquarium] / src / main / scala / gr / grnet / aquarium / Aquarium.scala
index f797f8f..fe606fd 100644 (file)
@@ -38,20 +38,20 @@ package gr.grnet.aquarium
 import com.ckkloverdos.env.Env
 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
 import com.ckkloverdos.props.Props
-import gr.grnet.aquarium.store.{StoreProvider}
+import connector.rabbitmq.RabbitMQProducer
+import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
 import java.io.File
 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
 import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
 import com.ckkloverdos.convert.Converters
 import java.util.concurrent.atomic.AtomicBoolean
 import org.slf4j.{LoggerFactory, Logger}
-import gr.grnet.aquarium.computation.UserStateComputations
 import com.ckkloverdos.maybe._
-import gr.grnet.aquarium.ResourceLocator._
 import com.ckkloverdos.sys.SysProp
 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
-import gr.grnet.aquarium.policy.{PolicyHistory, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
-import gr.grnet.aquarium.charging.ChargingBehavior
+import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
+import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
+import gr.grnet.aquarium.util.date.TimeHelpers
 
 /**
  *
@@ -63,6 +63,11 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
 
   @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
 
+  private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
+    apply(EnvKeys.defaultPolicyModel),
+    apply(EnvKeys.storeProvider).policyStore
+  )
+
   private[this] val _isStopping = new AtomicBoolean(false)
 
   override def toString = "%s/v%s".format(getClass.getName, version)
@@ -123,28 +128,12 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
   }
 
   private[this] def showBasicConfiguration(): Unit = {
-    logger.info("Aquarium Home = %s".format(
-      if(Homes.Folders.AquariumHome.isAbsolute)
-        Homes.Folders.AquariumHome
-      else
-        "%s [=%s]".format(Homes.Folders.AquariumHome, Homes.Folders.AquariumHome.getCanonicalPath)
-    ))
-
     for(folder ← this.eventsStoreFolder) {
       logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
     }
     this.eventsStoreFolder.throwMe // on error
 
-    for(prop ← Aquarium.PropsToShow) {
-      logger.info("{} = {}", prop.name, prop.rawValue)
-    }
-
-    logger.info("CONF_HERE =  {}", HERE)
-    logger.info("{} = {}", ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, ResourceLocator.Resources.AquariumPropertiesResource)
-    logger.info("{} = {}", ResourceLocator.ResourceNames.LOGBACK_XML, ResourceLocator.Resources.LogbackXMLResource)
-    logger.info("{} = {}", ResourceLocator.ResourceNames.POLICY_JSON, ResourceLocator.Resources.PolicyJSONResource)
-
-    logger.info("Runtime.getRuntime.availableProcessors() => {}", Runtime.getRuntime.availableProcessors())
+    logger.info("default policy = {}", defaultPolicyModel.toJsonString)
   }
 
   private[this] def addShutdownHooks(): Unit = {
@@ -239,43 +228,95 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
   }
 
   def currentResourceTypesMap: Map[String, ResourceType] = {
-    // FIXME: Implement
-    Map()
+    val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis())
+    if(policyOpt.isEmpty) {
+      throw new AquariumInternalError("Not even the default policy found")
+    }
+
+    policyOpt.get.resourceTypesMap
   }
 
-  def initialUserAgreementForRole(role: String, referenceTimeMillis: Long): UserAgreementModel = {
-    // FIXME: Where is the mapping?
-    StdUserAgreement("", None, Timespan(0L), defaultInitialUserRole, PolicyDefinedFullPriceTableRef)
+  def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = {
+    policyStore.loadValidPolicyAt(referenceTimeMillis) match {
+      case None ⇒
+        throw new AquariumInternalError(
+          "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
+        )
+
+      case Some(policy) ⇒
+        policy
+    }
   }
 
-  def initialBalanceForRole(role: String, referenceTimeMillis: Long): Double = {
-    // FIXME: Where is the mapping?
-    10000.0
+  def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = {
+    val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
+    policyAtReferenceTime.roleMapping.get(role) match {
+      case None ⇒
+        throw new AquariumInternalError("Unknown price table for role %s at %s".format(
+          role,
+          TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
+        ))
+
+      case Some(fullPriceTable) ⇒
+        fullPriceTable
+    }
   }
 
-  def defaultInitialUserRole: String = {
-    // FIXME: Read from properties?
-    "default"
+  /**
+   * Computes the initial user agreement for the given role and reference time. Also,
+   * records the ID from a potential related IMEvent.
+   *
+   * @param role                The role in the agreement
+   * @param referenceTimeMillis The reference time to consider for the agreement
+   */
+  def initialUserAgreement(
+      role: String,
+      referenceTimeMillis: Long,
+      relatedIMEventID: Option[String]
+  ): UserAgreementModel = {
+
+    // Just checking
+    assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis))
+
+    StdUserAgreement(
+      "<StandardUserAgreement>",
+      relatedIMEventID,
+      0,
+      Long.MaxValue,
+      role,
+      PolicyDefinedFullPriceTableRef
+    )
+  }
+
+  def initialUserBalance(role: String, referenceTimeMillis: Long): Double = {
+    // FIXME: Where is the mapping?
+    1000.0
   }
 
   def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = {
+    // A resource type never changes charging behavior. By definition.
     val className = resourceType.chargingBehavior
     _chargingBehaviorMap.get(className) match {
       case Some(chargingBehavior) ⇒
         chargingBehavior
 
       case _ ⇒
-        // It does not matter if this is entered by multiple threads and more than one instance of the same class
-        // is created. The returned instance is not meant to be cached.
-        val chargingBehavior = newInstance[ChargingBehavior](className)
-        _chargingBehaviorMap synchronized {
-          _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
+        try {
+          _chargingBehaviorMap synchronized {
+            val chargingBehavior = newInstance[ChargingBehavior](className)
+            _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
+            chargingBehavior
+          }
+        }
+        catch {
+          case e: Exception ⇒
+            throw new AquariumInternalError("Could not load charging behavior %s".format(className), e)
         }
-
-        chargingBehavior
     }
   }
 
+  def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel)
+
   def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
 
   def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore
@@ -284,13 +325,13 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
 
   def userStateStore = apply(EnvKeys.storeProvider).userStateStore
 
-  val policyStore = new PolicyHistory(apply(EnvKeys.storeProvider).policyStore)
+  def policyStore = this.cachingPolicyStore
 
   def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
 
   def eventBus = apply(EnvKeys.eventBus)
 
-  def userStateComputations = apply(EnvKeys.userStateComputations)
+  def chargingService = apply(EnvKeys.chargingService)
 
   def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
 
@@ -411,18 +452,6 @@ object Aquarium {
     final val adminCookie: TypedKey[Option[String]] =
       new AquariumEnvKey[Option[String]]("admin.cookie")
 
-//    final val resourceEventStore: TypedKey[ResourceEventStore] =
-//      new AquariumEnvKey[ResourceEventStore]("resource.event.store.class")
-
-//    final val imEventStore: TypedKey[IMEventStore] =
-//      new AquariumEnvKey[IMEventStore]("im.event.store.class")
-
-//    final val userStateStore: TypedKey[UserStateStore] =
-//      new AquariumEnvKey[UserStateStore]("user.state.store.class")
-
-//    final val policyStore: TypedKey[PolicyStore] =
-//      new AquariumEnvKey[PolicyStore]("policy.store.class")
-
     /**
      * The class that initializes the REST service
      */
@@ -441,17 +470,22 @@ object Aquarium {
     final val rabbitMQService: TypedKey[RabbitMQService] =
       new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
 
+    final val rabbitMQProducer: TypedKey[RabbitMQProducer] =
+      new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client")
+
     final val storeWatcherService: TypedKey[StoreWatcherService] =
       new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
 
     final val converters: TypedKey[Converters] =
       new AquariumEnvKey[Converters]("converters")
 
-    final val userStateComputations: TypedKey[UserStateComputations] =
-      new AquariumEnvKey[UserStateComputations]("user.state.computations")
+    final val chargingService: TypedKey[ChargingService] =
+      new AquariumEnvKey[ChargingService]("charging.service")
 
     final val defaultClassLoader: TypedKey[ClassLoader] =
       new AquariumEnvKey[ClassLoader]("default.class.loader")
 
+    final val defaultPolicyModel: TypedKey[PolicyModel] =
+      new AquariumEnvKey[PolicyModel]("default.policy.model")
   }
 }