X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/10c878190c3c2f60e11493d94a5ba68750b1cb3e..971db49d904a419b253e32fdc273c33b1613e743:/src/main/scala/gr/grnet/aquarium/Aquarium.scala diff --git a/src/main/scala/gr/grnet/aquarium/Aquarium.scala b/src/main/scala/gr/grnet/aquarium/Aquarium.scala index b80a3d7..19c1c71 100644 --- a/src/main/scala/gr/grnet/aquarium/Aquarium.scala +++ b/src/main/scala/gr/grnet/aquarium/Aquarium.scala @@ -35,23 +35,29 @@ package gr.grnet.aquarium +import com.ckkloverdos.convert.Converters import com.ckkloverdos.env.Env import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey} +import com.ckkloverdos.maybe._ import com.ckkloverdos.props.Props +import com.ckkloverdos.sys.SysProp import connector.rabbitmq.RabbitMQProducer -import gr.grnet.aquarium.store.{PolicyStore, StoreProvider} -import java.io.File -import gr.grnet.aquarium.util.{Loggable, Lifecycle} +import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior} +import gr.grnet.aquarium.message.avro.gen.{UserAgreementMsg, FullPriceTableMsg, IMEventMsg, ResourceTypeMsg, PolicyMsg} +import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, ModelFactory, AvroHelpers} +import gr.grnet.aquarium.policy.{AdHocFullPriceTableRef, FullPriceTableModel, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, UserAgreementModel, ResourceType} +import gr.grnet.aquarium.service.event.AquariumCreatedEvent import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService} -import com.ckkloverdos.convert.Converters +import gr.grnet.aquarium.store.StoreProvider +import gr.grnet.aquarium.util.date.TimeHelpers +import gr.grnet.aquarium.util.{Loggable, Lifecycle} +import java.io.File import java.util.concurrent.atomic.AtomicBoolean import org.slf4j.{LoggerFactory, Logger} -import com.ckkloverdos.maybe._ -import com.ckkloverdos.sys.SysProp -import gr.grnet.aquarium.service.event.AquariumCreatedEvent -import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType} -import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior} -import gr.grnet.aquarium.util.date.TimeHelpers +import gr.grnet.aquarium.event.CreditsModel +import gr.grnet.aquarium.charging.state.UserStateBootstrap +import java.util.{Map ⇒ JMap} +import java.util.{HashMap ⇒ JHashMap} /** * @@ -59,12 +65,16 @@ import gr.grnet.aquarium.util.date.TimeHelpers */ final class Aquarium(env: Env) extends Lifecycle with Loggable { + import Aquarium.EnvKeys @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]() + // Caching value for the latest resource mapping + @volatile private[this] var _resourceMapping = apply(EnvKeys.defaultPolicyMsg).getResourceMapping + private[this] lazy val cachingPolicyStore = new CachingPolicyStore( - apply(EnvKeys.defaultPolicyModel), + apply(EnvKeys.defaultPolicyMsg), apply(EnvKeys.storeProvider).policyStore ) @@ -107,7 +117,7 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable { } } - private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_)) + private[this] lazy val _allServices: Seq[_ <: Lifecycle] = Aquarium.ServiceKeys.map(this.apply(_)) private[this] def startServices(): Unit = { for(service ← _allServices) { @@ -133,7 +143,7 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable { } this.eventsStoreFolder.throwMe // on error - logger.info("default policy = {}", defaultPolicyModel.toJsonString) + logger.info("default policy = {}", AvroHelpers.jsonStringOfSpecificRecord(defaultPolicyMsg)) } private[this] def addShutdownHooks(): Unit = { @@ -227,30 +237,70 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable { } - def currentResourceTypesMap: Map[String, ResourceType] = { - val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis()) - if(policyOpt.isEmpty) { - throw new AquariumInternalError("Not even the default policy found") + /** + * @deprecated Use `currentResourceMapping` instead + */ + def resourceMappingAtMillis(millis: Long): JMap[String, ResourceTypeMsg] = { + val policyMspOpt = policyStore.loadPolicyAt(millis) + if(policyMspOpt.isEmpty) { + throw new AquariumInternalError( + "Cannot get resource mapping. Not even the default policy found for time %s", + TimeHelpers.toYYYYMMDDHHMMSSSSS(millis) + ) } - policyOpt.get.resourceTypesMap + val policyMsg = policyMspOpt.get + policyMsg.getResourceMapping + } + + /** + * Provides the current resource mapping. This value is cached. + * + * NOTE: The assumption is that the resource mapping is always updated with new keys, + * that is we allow only the addition of new resource types. + */ + def currentResourceMapping = { + this._resourceMapping synchronized this._resourceMapping } - def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = { - policyStore.loadValidPolicyAt(referenceTimeMillis) match { + // def resourceTypesMapAtMillis(millis: Long): Map[String, ResourceType] = { +// val policyMspOpt = policyStore.loadPolicyAt(millis) +// if(policyMspOpt.isEmpty) { +// throw new AquariumInternalError( +// "Cannot get resource types map. Not even the default policy found for time %s", +// TimeHelpers.toYYYYMMDDHHMMSSSSS(millis) +// ) +// } +// +// val policyMsg = policyMspOpt.get +// // TODO optimize +// ModelFactory.newPolicyModel(policyMsg).resourceTypesMap +// } +// +// def currentResourceTypesMap: Map[String, ResourceType] = { +// resourceTypesMapAtMillis(TimeHelpers.nowMillis()) +// } + + def unsafeValidPolicyModelAt(referenceTimeMillis: Long): PolicyModel = { + policyStore.loadPolicyAt(referenceTimeMillis) match { case None ⇒ throw new AquariumInternalError( "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)) ) - case Some(policy) ⇒ - policy + case Some(policyMsg) ⇒ + ModelFactory.newPolicyModel(policyMsg) } } - def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = { - val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis) - policyAtReferenceTime.roleMapping.get(role) match { + def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyMsg = { + unsafeValidPolicyModelAt(referenceTimeMillis).msg + } + + def unsafeFullPriceTableModelForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableModel = { + val policyModelAtReferenceTime = unsafeValidPolicyModelAt(referenceTimeMillis) + + policyModelAtReferenceTime.roleMapping.get(role) match { case None ⇒ throw new AquariumInternalError("Unknown price table for role %s at %s".format( role, @@ -262,40 +312,133 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable { } } + def unsafeFullPriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableMsg = { + val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis) + policyAtReferenceTime.getRoleMapping.get(role) match { + case null ⇒ + throw new AquariumInternalError("Unknown price table for role %s at %s".format( + role, + TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis) + )) + + case fullPriceTable ⇒ + fullPriceTable + } + } + + def unsafeFullPriceTableModelForAgreement( + userAgreementModel: UserAgreementModel, + knownPolicyModel: PolicyModel + ): FullPriceTableModel = { + val policyModel = knownPolicyModel match { + case null ⇒ + unsafeValidPolicyModelAt(userAgreementModel.validFromMillis) + + case policyModel ⇒ + policyModel + } + + userAgreementModel.fullPriceTableRef match { + case PolicyDefinedFullPriceTableRef ⇒ + val role = userAgreementModel.role + policyModel.roleMapping.get(role) match { + case None ⇒ + throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s", + role, + userAgreementModel.userID, + TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreementModel.validFromMillis) + ) + + case Some(fullPriceTable) ⇒ + fullPriceTable + } + + case AdHocFullPriceTableRef(fullPriceTable) ⇒ + fullPriceTable + } + } + + def unsafeFullPriceTableForAgreement( + userAgreement: UserAgreementMsg, + knownPolicyModel: PolicyModel + ): FullPriceTableMsg = { + + val policyModel = knownPolicyModel match { + case null ⇒ + unsafeValidPolicyModelAt(userAgreement.getValidFromMillis) + + case policyModel ⇒ + policyModel + } + + unsafeFullPriceTableForAgreement(userAgreement, policyModel.msg) + } + + def unsafeFullPriceTableForAgreement( + userAgreement: UserAgreementMsg, + knownPolicy: PolicyMsg + ): FullPriceTableMsg = { + val policy = knownPolicy match { + case null ⇒ + unsafeValidPolicyAt(userAgreement.getValidFromMillis) + + case policy ⇒ + policy + } + + val role = userAgreement.getRole + userAgreement.getFullPriceTableRef match { + case null ⇒ + policy.getRoleMapping.get(role) match { + case null ⇒ + throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s", + role, + userAgreement.getUserID, + TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreement.getValidFromMillis) + ) + + case fullPriceTable ⇒ + fullPriceTable + } + + case fullPriceTable ⇒ + fullPriceTable + } + } + /** * Computes the initial user agreement for the given role and reference time. Also, * records the ID from a potential related IMEvent. * - * @param role The role in the agreement - * @param referenceTimeMillis The reference time to consider for the agreement + * @param imEvent The IMEvent that creates the user */ - def initialUserAgreement( - role: String, - referenceTimeMillis: Long, - relatedIMEventID: Option[String] - ): UserAgreementModel = { + def initialUserAgreement(imEvent: IMEventMsg): UserAgreementModel = { + require(MessageHelpers.isIMEventCreate(imEvent)) + + val role = imEvent.getRole + val referenceTimeMillis = imEvent.getOccurredMillis // Just checking - assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis)) - - StdUserAgreement( - "", - relatedIMEventID, - 0, - Long.MaxValue, - role, - PolicyDefinedFullPriceTableRef() - ) + assert(null ne unsafeFullPriceTableModelForRoleAt(role, referenceTimeMillis)) + + ModelFactory.newUserAgreementModelFromIMEvent(imEvent) } - def initialUserBalance(role: String, referenceTimeMillis: Long): Double = { + def initialUserBalance(role: String, referenceTimeMillis: Long): CreditsModel.Type = { // FIXME: Where is the mapping? - 0.0 + CreditsModel.from(0.0) + } + + def getUserStateBootstrap(imEvent: IMEventMsg): UserStateBootstrap = { + UserStateBootstrap( + this.initialUserAgreement(imEvent), + this.initialUserBalance(imEvent.getRole, imEvent.getOccurredMillis) + ) } - def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = { + def chargingBehaviorOf(resourceType: ResourceTypeMsg): ChargingBehavior = { // A resource type never changes charging behavior. By definition. - val className = resourceType.chargingBehavior + val className = resourceType.getChargingBehaviorClass _chargingBehaviorMap.get(className) match { case Some(chargingBehavior) ⇒ chargingBehavior @@ -315,7 +458,7 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable { } } - def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel) + def defaultPolicyMsg = apply(EnvKeys.defaultPolicyMsg) def defaultClassLoader = apply(EnvKeys.defaultClassLoader) @@ -382,7 +525,8 @@ object Aquarium { EnvKeys.eventBus, EnvKeys.restService, EnvKeys.rabbitMQService, - EnvKeys.storeWatcherService + EnvKeys.storeWatcherService, + EnvKeys.rabbitMQProducer ) object EnvKeys { @@ -485,7 +629,7 @@ object Aquarium { final val defaultClassLoader: TypedKey[ClassLoader] = new AquariumEnvKey[ClassLoader]("default.class.loader") - final val defaultPolicyModel: TypedKey[PolicyModel] = - new AquariumEnvKey[PolicyModel]("default.policy.model") + final val defaultPolicyMsg: TypedKey[PolicyMsg] = + new AquariumEnvKey[PolicyMsg]("default.policy.msg") } }