package gr.grnet.aquarium
+import com.ckkloverdos.convert.Converters
import com.ckkloverdos.env.Env
import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
-import com.ckkloverdos.props.Props
-import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
-import java.io.File
-import gr.grnet.aquarium.util.{Loggable, Lifecycle}
-import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
-import com.ckkloverdos.convert.Converters
-import java.util.concurrent.atomic.AtomicBoolean
-import org.slf4j.{LoggerFactory, Logger}
import com.ckkloverdos.maybe._
+import com.ckkloverdos.props.Props
import com.ckkloverdos.sys.SysProp
-import gr.grnet.aquarium.service.event.AquariumCreatedEvent
-import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
+import connector.rabbitmq.RabbitMQProducer
import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
+import gr.grnet.aquarium.message.avro.gen.{UserAgreementMsg, FullPriceTableMsg, IMEventMsg, ResourceTypeMsg, PolicyMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, ModelFactory, AvroHelpers}
+import gr.grnet.aquarium.policy.{AdHocFullPriceTableRef, FullPriceTableModel, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, UserAgreementModel, ResourceType}
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
+import gr.grnet.aquarium.store.StoreProvider
import gr.grnet.aquarium.util.date.TimeHelpers
+import gr.grnet.aquarium.util.{Loggable, Lifecycle}
+import java.io.File
+import java.util.concurrent.atomic.AtomicBoolean
+import org.slf4j.{LoggerFactory, Logger}
+import gr.grnet.aquarium.event.CreditsModel
+import gr.grnet.aquarium.charging.state.UserStateBootstrap
+import java.util.{Map ⇒ JMap}
+import java.util.{HashMap ⇒ JHashMap}
/**
*
*/
final class Aquarium(env: Env) extends Lifecycle with Loggable {
+
import Aquarium.EnvKeys
@volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
+ // Caching value for the latest resource mapping
+ @volatile private[this] var _resourceMapping = apply(EnvKeys.defaultPolicyMsg).getResourceMapping
+
private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
- apply(EnvKeys.defaultPolicyModel),
+ apply(EnvKeys.defaultPolicyMsg),
apply(EnvKeys.storeProvider).policyStore
)
}
}
- private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_))
+ private[this] lazy val _allServices: Seq[_ <: Lifecycle] = Aquarium.ServiceKeys.map(this.apply(_))
private[this] def startServices(): Unit = {
for(service ← _allServices) {
}
this.eventsStoreFolder.throwMe // on error
- logger.info("default policy = {}", defaultPolicyModel.toJsonString)
+ logger.info("default policy = {}", AvroHelpers.jsonStringOfSpecificRecord(defaultPolicyMsg))
}
private[this] def addShutdownHooks(): Unit = {
}
- def currentResourceTypesMap: Map[String, ResourceType] = {
- val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis())
- if(policyOpt.isEmpty) {
- throw new AquariumInternalError("Not even the default policy found")
+ /**
+ * @deprecated Use `currentResourceMapping` instead
+ */
+ def resourceMappingAtMillis(millis: Long): JMap[String, ResourceTypeMsg] = {
+ val policyMspOpt = policyStore.loadPolicyAt(millis)
+ if(policyMspOpt.isEmpty) {
+ throw new AquariumInternalError(
+ "Cannot get resource mapping. Not even the default policy found for time %s",
+ TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
+ )
}
- policyOpt.get.resourceTypesMap
+ val policyMsg = policyMspOpt.get
+ policyMsg.getResourceMapping
}
- def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = {
- policyStore.loadValidPolicyAt(referenceTimeMillis) match {
+ /**
+ * Provides the current resource mapping. This value is cached.
+ *
+ * NOTE: The assumption is that the resource mapping is always updated with new keys,
+ * that is we allow only the addition of new resource types.
+ */
+ def currentResourceMapping = {
+ this._resourceMapping synchronized this._resourceMapping
+ }
+
+ // def resourceTypesMapAtMillis(millis: Long): Map[String, ResourceType] = {
+// val policyMspOpt = policyStore.loadPolicyAt(millis)
+// if(policyMspOpt.isEmpty) {
+// throw new AquariumInternalError(
+// "Cannot get resource types map. Not even the default policy found for time %s",
+// TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
+// )
+// }
+//
+// val policyMsg = policyMspOpt.get
+// // TODO optimize
+// ModelFactory.newPolicyModel(policyMsg).resourceTypesMap
+// }
+//
+// def currentResourceTypesMap: Map[String, ResourceType] = {
+// resourceTypesMapAtMillis(TimeHelpers.nowMillis())
+// }
+
+ def unsafeValidPolicyModelAt(referenceTimeMillis: Long): PolicyModel = {
+ policyStore.loadPolicyAt(referenceTimeMillis) match {
case None ⇒
throw new AquariumInternalError(
"No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
)
- case Some(policy) ⇒
- policy
+ case Some(policyMsg) ⇒
+ ModelFactory.newPolicyModel(policyMsg)
}
}
- def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = {
- val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
- policyAtReferenceTime.roleMapping.get(role) match {
+ def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyMsg = {
+ unsafeValidPolicyModelAt(referenceTimeMillis).msg
+ }
+
+ def unsafeFullPriceTableModelForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableModel = {
+ val policyModelAtReferenceTime = unsafeValidPolicyModelAt(referenceTimeMillis)
+
+ policyModelAtReferenceTime.roleMapping.get(role) match {
case None ⇒
throw new AquariumInternalError("Unknown price table for role %s at %s".format(
role,
}
}
+ def unsafeFullPriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableMsg = {
+ val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
+ policyAtReferenceTime.getRoleMapping.get(role) match {
+ case null ⇒
+ throw new AquariumInternalError("Unknown price table for role %s at %s".format(
+ role,
+ TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
+ ))
+
+ case fullPriceTable ⇒
+ fullPriceTable
+ }
+ }
+
+ def unsafeFullPriceTableModelForAgreement(
+ userAgreementModel: UserAgreementModel,
+ knownPolicyModel: PolicyModel
+ ): FullPriceTableModel = {
+ val policyModel = knownPolicyModel match {
+ case null ⇒
+ unsafeValidPolicyModelAt(userAgreementModel.validFromMillis)
+
+ case policyModel ⇒
+ policyModel
+ }
+
+ userAgreementModel.fullPriceTableRef match {
+ case PolicyDefinedFullPriceTableRef ⇒
+ val role = userAgreementModel.role
+ policyModel.roleMapping.get(role) match {
+ case None ⇒
+ throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
+ role,
+ userAgreementModel.userID,
+ TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreementModel.validFromMillis)
+ )
+
+ case Some(fullPriceTable) ⇒
+ fullPriceTable
+ }
+
+ case AdHocFullPriceTableRef(fullPriceTable) ⇒
+ fullPriceTable
+ }
+ }
+
+ def unsafeFullPriceTableForAgreement(
+ userAgreement: UserAgreementMsg,
+ knownPolicyModel: PolicyModel
+ ): FullPriceTableMsg = {
+
+ val policyModel = knownPolicyModel match {
+ case null ⇒
+ unsafeValidPolicyModelAt(userAgreement.getValidFromMillis)
+
+ case policyModel ⇒
+ policyModel
+ }
+
+ unsafeFullPriceTableForAgreement(userAgreement, policyModel.msg)
+ }
+
+ def unsafeFullPriceTableForAgreement(
+ userAgreement: UserAgreementMsg,
+ knownPolicy: PolicyMsg
+ ): FullPriceTableMsg = {
+ val policy = knownPolicy match {
+ case null ⇒
+ unsafeValidPolicyAt(userAgreement.getValidFromMillis)
+
+ case policy ⇒
+ policy
+ }
+
+ val role = userAgreement.getRole
+ userAgreement.getFullPriceTableRef match {
+ case null ⇒
+ policy.getRoleMapping.get(role) match {
+ case null ⇒
+ throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
+ role,
+ userAgreement.getUserID,
+ TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreement.getValidFromMillis)
+ )
+
+ case fullPriceTable ⇒
+ fullPriceTable
+ }
+
+ case fullPriceTable ⇒
+ fullPriceTable
+ }
+ }
+
/**
* Computes the initial user agreement for the given role and reference time. Also,
* records the ID from a potential related IMEvent.
*
- * @param role The role in the agreement
- * @param referenceTimeMillis The reference time to consider for the agreement
+ * @param imEvent The IMEvent that creates the user
*/
- def initialUserAgreement(
- role: String,
- referenceTimeMillis: Long,
- relatedIMEventID: Option[String]
- ): UserAgreementModel = {
+ def initialUserAgreement(imEvent: IMEventMsg): UserAgreementModel = {
+ require(MessageHelpers.isIMEventCreate(imEvent))
+
+ val role = imEvent.getRole
+ val referenceTimeMillis = imEvent.getOccurredMillis
// Just checking
- assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis))
-
- StdUserAgreement(
- "<StandardUserAgreement>",
- relatedIMEventID,
- 0,
- Long.MaxValue,
- role,
- PolicyDefinedFullPriceTableRef
- )
+ assert(null ne unsafeFullPriceTableModelForRoleAt(role, referenceTimeMillis))
+
+ ModelFactory.newUserAgreementModelFromIMEvent(imEvent)
}
- def initialUserBalance(role: String, referenceTimeMillis: Long): Double = {
+ def initialUserBalance(role: String, referenceTimeMillis: Long): CreditsModel.Type = {
// FIXME: Where is the mapping?
- 1000.0
+ CreditsModel.from(0.0)
}
- def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = {
- val className = resourceType.chargingBehavior
+ def getUserStateBootstrap(imEvent: IMEventMsg): UserStateBootstrap = {
+ UserStateBootstrap(
+ this.initialUserAgreement(imEvent),
+ this.initialUserBalance(imEvent.getRole, imEvent.getOccurredMillis)
+ )
+ }
+
+ def chargingBehaviorOf(resourceType: ResourceTypeMsg): ChargingBehavior = {
+ // A resource type never changes charging behavior. By definition.
+ val className = resourceType.getChargingBehaviorClass
_chargingBehaviorMap.get(className) match {
case Some(chargingBehavior) ⇒
chargingBehavior
case _ ⇒
- // It does not matter if this is entered by multiple threads and more than one instance of the same class
- // is created. The returned instance is not meant to be cached.
try {
- val chargingBehavior = newInstance[ChargingBehavior](className)
_chargingBehaviorMap synchronized {
+ val chargingBehavior = newInstance[ChargingBehavior](className)
_chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
+ chargingBehavior
}
-
- chargingBehavior
}
catch {
case e: Exception ⇒
}
}
- def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel)
+ def defaultPolicyMsg = apply(EnvKeys.defaultPolicyMsg)
def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
EnvKeys.eventBus,
EnvKeys.restService,
EnvKeys.rabbitMQService,
- EnvKeys.storeWatcherService
+ EnvKeys.storeWatcherService,
+ EnvKeys.rabbitMQProducer
)
object EnvKeys {
final val rabbitMQService: TypedKey[RabbitMQService] =
new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
+ final val rabbitMQProducer: TypedKey[RabbitMQProducer] =
+ new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client")
+
final val storeWatcherService: TypedKey[StoreWatcherService] =
new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
final val defaultClassLoader: TypedKey[ClassLoader] =
new AquariumEnvKey[ClassLoader]("default.class.loader")
- final val defaultPolicyModel: TypedKey[PolicyModel] =
- new AquariumEnvKey[PolicyModel]("default.policy.model")
+ final val defaultPolicyMsg: TypedKey[PolicyMsg] =
+ new AquariumEnvKey[PolicyMsg]("default.policy.msg")
}
}