import com.ckkloverdos.env.Env
import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
import com.ckkloverdos.props.Props
-import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, IMEventStore, ResourceEventStore, StoreProvider}
+import connector.rabbitmq.RabbitMQProducer
+import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
import java.io.File
import gr.grnet.aquarium.util.{Loggable, Lifecycle}
import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
import com.ckkloverdos.convert.Converters
import java.util.concurrent.atomic.AtomicBoolean
import org.slf4j.{LoggerFactory, Logger}
-import gr.grnet.aquarium.computation.UserStateComputations
import com.ckkloverdos.maybe._
-import gr.grnet.aquarium.ResourceLocator._
import com.ckkloverdos.sys.SysProp
import gr.grnet.aquarium.service.event.AquariumCreatedEvent
-import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
+import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
+import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
+import gr.grnet.aquarium.util.date.TimeHelpers
/**
*
final class Aquarium(env: Env) extends Lifecycle with Loggable {
import Aquarium.EnvKeys
+ @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
+
+ private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
+ apply(EnvKeys.defaultPolicyModel),
+ apply(EnvKeys.storeProvider).policyStore
+ )
+
private[this] val _isStopping = new AtomicBoolean(false)
override def toString = "%s/v%s".format(getClass.getName, version)
}
}
- private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this(_))
+ private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_))
private[this] def startServices(): Unit = {
for(service ← _allServices) {
}
private[this] def showBasicConfiguration(): Unit = {
- logger.info("Aquarium Home = %s".format(
- if(Homes.Folders.AquariumHome.isAbsolute)
- Homes.Folders.AquariumHome
- else
- "%s [=%s]".format(Homes.Folders.AquariumHome, Homes.Folders.AquariumHome.getCanonicalPath)
- ))
-
for(folder ← this.eventsStoreFolder) {
logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
}
this.eventsStoreFolder.throwMe // on error
- for(prop ← Aquarium.PropsToShow) {
- logger.info("{} = {}", prop.name, prop.rawValue)
- }
-
- logger.info("CONF_HERE = {}", HERE)
- logger.info("{} = {}", ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, ResourceLocator.Resources.AquariumPropertiesResource)
- logger.info("{} = {}", ResourceLocator.ResourceNames.LOGBACK_XML, ResourceLocator.Resources.LogbackXMLResource)
- logger.info("{} = {}", ResourceLocator.ResourceNames.POLICY_YAML, ResourceLocator.Resources.PolicyYAMLResource)
-
- logger.info("Runtime.getRuntime.availableProcessors() => {}", Runtime.getRuntime.availableProcessors())
+ logger.info("default policy = {}", defaultPolicyModel.toJsonString)
}
private[this] def addShutdownHooks(): Unit = {
/**
* Reflectively provide a new instance of a class and configure it appropriately.
*/
- def newInstance[C <: AnyRef](_class: Class[C], className: String): C = {
+ def newInstance[C <: AnyRef](_class: Class[C]): C = {
+ newInstance(_class.getName)
+ }
+
+ /**
+ * Reflectively provide a new instance of a class and configure it appropriately.
+ */
+ def newInstance[C <: AnyRef](className: String): C = {
val originalProps = apply(EnvKeys.originalProps)
val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
}
def currentResourceTypesMap: Map[String, ResourceType] = {
- // FIXME: Implement
- Map()
+ val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis())
+ if(policyOpt.isEmpty) {
+ throw new AquariumInternalError("Not even the default policy found")
+ }
+
+ policyOpt.get.resourceTypesMap
}
- def initialUserAgreementForRole(role: String, referenceTimeMillis: Long): UserAgreementModel = {
- // FIXME: Where is the mapping?
- StdUserAgreement("", None, Timespan(0L), defaultInitialUserRole, PolicyDefinedFullPriceTableRef)
+ def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = {
+ policyStore.loadValidPolicyAt(referenceTimeMillis) match {
+ case None ⇒
+ throw new AquariumInternalError(
+ "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
+ )
+
+ case Some(policy) ⇒
+ policy
+ }
}
- def initialBalanceForRole(role: String, referenceTimeMillis: Long): Double = {
+ def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = {
+ val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
+ policyAtReferenceTime.roleMapping.get(role) match {
+ case None ⇒
+ throw new AquariumInternalError("Unknown price table for role %s at %s".format(
+ role,
+ TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
+ ))
+
+ case Some(fullPriceTable) ⇒
+ fullPriceTable
+ }
+ }
+
+ /**
+ * Computes the initial user agreement for the given role and reference time. Also,
+ * records the ID from a potential related IMEvent.
+ *
+ * @param role The role in the agreement
+ * @param referenceTimeMillis The reference time to consider for the agreement
+ */
+ def initialUserAgreement(
+ role: String,
+ referenceTimeMillis: Long,
+ relatedIMEventID: Option[String]
+ ): UserAgreementModel = {
+
+ // Just checking
+ assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis))
+
+ StdUserAgreement(
+ "<StandardUserAgreement>",
+ relatedIMEventID,
+ 0,
+ Long.MaxValue,
+ role,
+ PolicyDefinedFullPriceTableRef()
+ )
+ }
+
+ def initialUserBalance(role: String, referenceTimeMillis: Long): Double = {
// FIXME: Where is the mapping?
- 10000.0
+ 0.0
}
- def defaultInitialUserRole: String = {
- // FIXME: Read from properties?
- "default"
+ def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = {
+ // A resource type never changes charging behavior. By definition.
+ val className = resourceType.chargingBehavior
+ _chargingBehaviorMap.get(className) match {
+ case Some(chargingBehavior) ⇒
+ chargingBehavior
+
+ case _ ⇒
+ try {
+ _chargingBehaviorMap synchronized {
+ val chargingBehavior = newInstance[ChargingBehavior](className)
+ _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
+ chargingBehavior
+ }
+ }
+ catch {
+ case e: Exception ⇒
+ throw new AquariumInternalError("Could not load charging behavior %s".format(className), e)
+ }
+ }
}
+ def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel)
+
def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
- def resourceEventStore = apply(EnvKeys.resourceEventStore)
+ def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore
- def imEventStore = apply(EnvKeys.imEventStore)
+ def imEventStore = apply(EnvKeys.storeProvider).imEventStore
- def userStateStore = apply(EnvKeys.userStateStore)
+ def userStateStore = apply(EnvKeys.storeProvider).userStateStore
- def policyStore = apply(EnvKeys.policyStore)
+ def policyStore = this.cachingPolicyStore
def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
def eventBus = apply(EnvKeys.eventBus)
- def userStateComputations = apply(EnvKeys.userStateComputations)
+ def chargingService = apply(EnvKeys.chargingService)
def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
}
final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
- override def toString = name
+ override def toString = "%s(%s)".format(manifest[T], name)
}
final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
final val adminCookie: TypedKey[Option[String]] =
new AquariumEnvKey[Option[String]]("admin.cookie")
- final val resourceEventStore: TypedKey[ResourceEventStore] =
- new AquariumEnvKey[ResourceEventStore]("resource.event.store.class")
-
- final val imEventStore: TypedKey[IMEventStore] =
- new AquariumEnvKey[IMEventStore]("im.event.store.class")
-
- final val userStateStore: TypedKey[UserStateStore] =
- new AquariumEnvKey[UserStateStore]("user.state.store.class")
-
- final val policyStore: TypedKey[PolicyStore] =
- new AquariumEnvKey[PolicyStore]("policy.store.class")
-
/**
* The class that initializes the REST service
*/
final val rabbitMQService: TypedKey[RabbitMQService] =
new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
+ final val rabbitMQProducer: TypedKey[RabbitMQProducer] =
+ new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client")
+
final val storeWatcherService: TypedKey[StoreWatcherService] =
new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
final val converters: TypedKey[Converters] =
new AquariumEnvKey[Converters]("converters")
- final val userStateComputations: TypedKey[UserStateComputations] =
- new AquariumEnvKey[UserStateComputations]("user.state.computations")
+ final val chargingService: TypedKey[ChargingService] =
+ new AquariumEnvKey[ChargingService]("charging.service")
final val defaultClassLoader: TypedKey[ClassLoader] =
new AquariumEnvKey[ClassLoader]("default.class.loader")
+ final val defaultPolicyModel: TypedKey[PolicyModel] =
+ new AquariumEnvKey[PolicyModel]("default.policy.model")
}
}