Implement Continuous behavior with the new scheme
[aquarium] / src / main / scala / gr / grnet / aquarium / Aquarium.scala
index a31281e..b80a3d7 100644 (file)
@@ -38,19 +38,20 @@ package gr.grnet.aquarium
 import com.ckkloverdos.env.Env
 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
 import com.ckkloverdos.props.Props
-import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, IMEventStore, ResourceEventStore, StoreProvider}
+import connector.rabbitmq.RabbitMQProducer
+import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
 import java.io.File
 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
 import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
 import com.ckkloverdos.convert.Converters
 import java.util.concurrent.atomic.AtomicBoolean
 import org.slf4j.{LoggerFactory, Logger}
-import gr.grnet.aquarium.computation.UserStateComputations
 import com.ckkloverdos.maybe._
-import gr.grnet.aquarium.ResourceLocator._
 import com.ckkloverdos.sys.SysProp
 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
-import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
+import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
+import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
+import gr.grnet.aquarium.util.date.TimeHelpers
 
 /**
  *
@@ -60,6 +61,13 @@ import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreemen
 final class Aquarium(env: Env) extends Lifecycle with Loggable {
   import Aquarium.EnvKeys
 
+  @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
+
+  private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
+    apply(EnvKeys.defaultPolicyModel),
+    apply(EnvKeys.storeProvider).policyStore
+  )
+
   private[this] val _isStopping = new AtomicBoolean(false)
 
   override def toString = "%s/v%s".format(getClass.getName, version)
@@ -99,7 +107,7 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
     }
   }
 
-  private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this(_))
+  private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_))
 
   private[this] def startServices(): Unit = {
     for(service ← _allServices) {
@@ -120,28 +128,12 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
   }
 
   private[this] def showBasicConfiguration(): Unit = {
-    logger.info("Aquarium Home = %s".format(
-      if(Homes.Folders.AquariumHome.isAbsolute)
-        Homes.Folders.AquariumHome
-      else
-        "%s [=%s]".format(Homes.Folders.AquariumHome, Homes.Folders.AquariumHome.getCanonicalPath)
-    ))
-
     for(folder ← this.eventsStoreFolder) {
       logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
     }
     this.eventsStoreFolder.throwMe // on error
 
-    for(prop ← Aquarium.PropsToShow) {
-      logger.info("{} = {}", prop.name, prop.rawValue)
-    }
-
-    logger.info("CONF_HERE =  {}", HERE)
-    logger.info("{} = {}", ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, ResourceLocator.Resources.AquariumPropertiesResource)
-    logger.info("{} = {}", ResourceLocator.ResourceNames.LOGBACK_XML, ResourceLocator.Resources.LogbackXMLResource)
-    logger.info("{} = {}", ResourceLocator.ResourceNames.POLICY_YAML, ResourceLocator.Resources.PolicyYAMLResource)
-
-    logger.info("Runtime.getRuntime.availableProcessors() => {}", Runtime.getRuntime.availableProcessors())
+    logger.info("default policy = {}", defaultPolicyModel.toJsonString)
   }
 
   private[this] def addShutdownHooks(): Unit = {
@@ -179,7 +171,14 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
   /**
    * Reflectively provide a new instance of a class and configure it appropriately.
    */
-  def newInstance[C <: AnyRef](_class: Class[C], className: String): C = {
+  def newInstance[C <: AnyRef](_class: Class[C]): C = {
+    newInstance(_class.getName)
+  }
+
+  /**
+   * Reflectively provide a new instance of a class and configure it appropriately.
+   */
+  def newInstance[C <: AnyRef](className: String): C = {
     val originalProps = apply(EnvKeys.originalProps)
 
     val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
@@ -229,40 +228,110 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
   }
 
   def currentResourceTypesMap: Map[String, ResourceType] = {
-    // FIXME: Implement
-    Map()
+    val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis())
+    if(policyOpt.isEmpty) {
+      throw new AquariumInternalError("Not even the default policy found")
+    }
+
+    policyOpt.get.resourceTypesMap
   }
 
-  def initialUserAgreementForRole(role: String, referenceTimeMillis: Long): UserAgreementModel = {
-    // FIXME: Where is the mapping?
-    StdUserAgreement("", None, Timespan(0L), defaultInitialUserRole, PolicyDefinedFullPriceTableRef)
+  def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = {
+    policyStore.loadValidPolicyAt(referenceTimeMillis) match {
+      case None ⇒
+        throw new AquariumInternalError(
+          "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
+        )
+
+      case Some(policy) ⇒
+        policy
+    }
   }
 
-  def initialBalanceForRole(role: String, referenceTimeMillis: Long): Double = {
+  def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = {
+    val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
+    policyAtReferenceTime.roleMapping.get(role) match {
+      case None ⇒
+        throw new AquariumInternalError("Unknown price table for role %s at %s".format(
+          role,
+          TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
+        ))
+
+      case Some(fullPriceTable) ⇒
+        fullPriceTable
+    }
+  }
+
+  /**
+   * Computes the initial user agreement for the given role and reference time. Also,
+   * records the ID from a potential related IMEvent.
+   *
+   * @param role                The role in the agreement
+   * @param referenceTimeMillis The reference time to consider for the agreement
+   */
+  def initialUserAgreement(
+      role: String,
+      referenceTimeMillis: Long,
+      relatedIMEventID: Option[String]
+  ): UserAgreementModel = {
+
+    // Just checking
+    assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis))
+
+    StdUserAgreement(
+      "<StandardUserAgreement>",
+      relatedIMEventID,
+      0,
+      Long.MaxValue,
+      role,
+      PolicyDefinedFullPriceTableRef()
+    )
+  }
+
+  def initialUserBalance(role: String, referenceTimeMillis: Long): Double = {
     // FIXME: Where is the mapping?
-    10000.0
+    0.0
   }
 
-  def defaultInitialUserRole: String = {
-    // FIXME: Read from properties?
-    "default"
+  def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = {
+    // A resource type never changes charging behavior. By definition.
+    val className = resourceType.chargingBehavior
+    _chargingBehaviorMap.get(className) match {
+      case Some(chargingBehavior) ⇒
+        chargingBehavior
+
+      case _ ⇒
+        try {
+          _chargingBehaviorMap synchronized {
+            val chargingBehavior = newInstance[ChargingBehavior](className)
+            _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
+            chargingBehavior
+          }
+        }
+        catch {
+          case e: Exception ⇒
+            throw new AquariumInternalError("Could not load charging behavior %s".format(className), e)
+        }
+    }
   }
 
+  def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel)
+
   def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
 
-  def resourceEventStore = apply(EnvKeys.resourceEventStore)
+  def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore
 
-  def imEventStore = apply(EnvKeys.imEventStore)
+  def imEventStore = apply(EnvKeys.storeProvider).imEventStore
 
-  def userStateStore = apply(EnvKeys.userStateStore)
+  def userStateStore = apply(EnvKeys.storeProvider).userStateStore
 
-  def policyStore = apply(EnvKeys.policyStore)
+  def policyStore = this.cachingPolicyStore
 
   def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
 
   def eventBus = apply(EnvKeys.eventBus)
 
-  def userStateComputations = apply(EnvKeys.userStateComputations)
+  def chargingService = apply(EnvKeys.chargingService)
 
   def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
 
@@ -304,7 +373,7 @@ object Aquarium {
  }
 
   final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
-    override def toString = name
+    override def toString = "%s(%s)".format(manifest[T], name)
   }
 
   final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
@@ -383,18 +452,6 @@ object Aquarium {
     final val adminCookie: TypedKey[Option[String]] =
       new AquariumEnvKey[Option[String]]("admin.cookie")
 
-    final val resourceEventStore: TypedKey[ResourceEventStore] =
-      new AquariumEnvKey[ResourceEventStore]("resource.event.store.class")
-
-    final val imEventStore: TypedKey[IMEventStore] =
-      new AquariumEnvKey[IMEventStore]("im.event.store.class")
-
-    final val userStateStore: TypedKey[UserStateStore] =
-      new AquariumEnvKey[UserStateStore]("user.state.store.class")
-
-    final val policyStore: TypedKey[PolicyStore] =
-      new AquariumEnvKey[PolicyStore]("policy.store.class")
-
     /**
      * The class that initializes the REST service
      */
@@ -413,17 +470,22 @@ object Aquarium {
     final val rabbitMQService: TypedKey[RabbitMQService] =
       new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
 
+    final val rabbitMQProducer: TypedKey[RabbitMQProducer] =
+      new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client")
+
     final val storeWatcherService: TypedKey[StoreWatcherService] =
       new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
 
     final val converters: TypedKey[Converters] =
       new AquariumEnvKey[Converters]("converters")
 
-    final val userStateComputations: TypedKey[UserStateComputations] =
-      new AquariumEnvKey[UserStateComputations]("user.state.computations")
+    final val chargingService: TypedKey[ChargingService] =
+      new AquariumEnvKey[ChargingService]("charging.service")
 
     final val defaultClassLoader: TypedKey[ClassLoader] =
       new AquariumEnvKey[ClassLoader]("default.class.loader")
 
+    final val defaultPolicyModel: TypedKey[PolicyModel] =
+      new AquariumEnvKey[PolicyModel]("default.policy.model")
   }
 }