package gr.grnet.aquarium
+import com.ckkloverdos.convert.Converters
import com.ckkloverdos.env.Env
import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
+import com.ckkloverdos.maybe._
import com.ckkloverdos.props.Props
+import com.ckkloverdos.sys.SysProp
import connector.rabbitmq.RabbitMQProducer
-import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
-import java.io.File
-import gr.grnet.aquarium.util.{Loggable, Lifecycle}
+import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
+import gr.grnet.aquarium.message.avro.gen.{UserAgreementMsg, FullPriceTableMsg, IMEventMsg, ResourceTypeMsg, PolicyMsg}
+import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, ModelFactory, AvroHelpers}
+import gr.grnet.aquarium.policy.{AdHocFullPriceTableRef, FullPriceTableModel, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, UserAgreementModel, ResourceType}
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
-import com.ckkloverdos.convert.Converters
+import gr.grnet.aquarium.store.StoreProvider
+import gr.grnet.aquarium.util.date.TimeHelpers
+import gr.grnet.aquarium.util.{Loggable, Lifecycle}
+import java.io.File
import java.util.concurrent.atomic.AtomicBoolean
import org.slf4j.{LoggerFactory, Logger}
-import com.ckkloverdos.maybe._
-import com.ckkloverdos.sys.SysProp
-import gr.grnet.aquarium.service.event.AquariumCreatedEvent
-import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
-import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
-import gr.grnet.aquarium.util.date.TimeHelpers
+import java.util.{Map ⇒ JMap}
+import java.util.{HashMap ⇒ JHashMap}
/**
*
*/
final class Aquarium(env: Env) extends Lifecycle with Loggable {
+
import Aquarium.EnvKeys
@volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
+ // Caching value for the latest resource mapping
+ @volatile private[this] var _resourceMapping = apply(EnvKeys.defaultPolicyMsg).getResourceMapping
+
private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
- apply(EnvKeys.defaultPolicyModel),
+ apply(EnvKeys.defaultPolicyMsg),
apply(EnvKeys.storeProvider).policyStore
)
}
}
- private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_))
+ private[this] lazy val _allServices: Seq[_ <: Lifecycle] = Aquarium.ServiceKeys.map(this.apply(_))
private[this] def startServices(): Unit = {
for(service ← _allServices) {
}
this.eventsStoreFolder.throwMe // on error
- logger.info("default policy = {}", defaultPolicyModel.toJsonString)
+ logger.info("default policy = {}", AvroHelpers.jsonStringOfSpecificRecord(defaultPolicyMsg))
}
private[this] def addShutdownHooks(): Unit = {
}
- def currentResourceTypesMap: Map[String, ResourceType] = {
- val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis())
- if(policyOpt.isEmpty) {
- throw new AquariumInternalError("Not even the default policy found")
+ /**
+ * @deprecated Use `currentResourceMapping` instead
+ */
+ def resourceMappingAtMillis(millis: Long): JMap[String, ResourceTypeMsg] = {
+ val policyMspOpt = policyStore.loadPolicyAt(millis)
+ if(policyMspOpt.isEmpty) {
+ throw new AquariumInternalError(
+ "Cannot get resource mapping. Not even the default policy found for time %s",
+ TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
+ )
}
- policyOpt.get.resourceTypesMap
+ val policyMsg = policyMspOpt.get
+ policyMsg.getResourceMapping
}
- def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = {
- policyStore.loadValidPolicyAt(referenceTimeMillis) match {
+ /**
+ * Provides the current resource mapping. This value is cached.
+ *
+ * NOTE: The assumption is that the resource mapping is always updated with new keys,
+ * that is we allow only the addition of new resource types.
+ */
+ def currentResourceMapping = {
+ this._resourceMapping synchronized this._resourceMapping
+ }
+
+ // def resourceTypesMapAtMillis(millis: Long): Map[String, ResourceType] = {
+// val policyMspOpt = policyStore.loadPolicyAt(millis)
+// if(policyMspOpt.isEmpty) {
+// throw new AquariumInternalError(
+// "Cannot get resource types map. Not even the default policy found for time %s",
+// TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
+// )
+// }
+//
+// val policyMsg = policyMspOpt.get
+// // TODO optimize
+// ModelFactory.newPolicyModel(policyMsg).resourceTypesMap
+// }
+//
+// def currentResourceTypesMap: Map[String, ResourceType] = {
+// resourceTypesMapAtMillis(TimeHelpers.nowMillis())
+// }
+
+ def unsafeValidPolicyModelAt(referenceTimeMillis: Long): PolicyModel = {
+ policyStore.loadPolicyAt(referenceTimeMillis) match {
case None ⇒
throw new AquariumInternalError(
"No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
)
- case Some(policy) ⇒
- policy
+ case Some(policyMsg) ⇒
+ ModelFactory.newPolicyModel(policyMsg)
}
}
- def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = {
- val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
- policyAtReferenceTime.roleMapping.get(role) match {
+ def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyMsg = {
+ unsafeValidPolicyModelAt(referenceTimeMillis).msg
+ }
+
+ def unsafeFullPriceTableModelForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableModel = {
+ val policyModelAtReferenceTime = unsafeValidPolicyModelAt(referenceTimeMillis)
+
+ policyModelAtReferenceTime.roleMapping.get(role) match {
case None ⇒
throw new AquariumInternalError("Unknown price table for role %s at %s".format(
role,
}
}
+ def unsafeFullPriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableMsg = {
+ val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
+ policyAtReferenceTime.getRoleMapping.get(role) match {
+ case null ⇒
+ throw new AquariumInternalError("Unknown price table for role %s at %s".format(
+ role,
+ TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
+ ))
+
+ case fullPriceTable ⇒
+ fullPriceTable
+ }
+ }
+
+ def unsafeFullPriceTableModelForAgreement(
+ userAgreementModel: UserAgreementModel,
+ knownPolicyModel: PolicyModel
+ ): FullPriceTableModel = {
+ val policyModel = knownPolicyModel match {
+ case null ⇒
+ unsafeValidPolicyModelAt(userAgreementModel.validFromMillis)
+
+ case policyModel ⇒
+ policyModel
+ }
+
+ userAgreementModel.fullPriceTableRef match {
+ case PolicyDefinedFullPriceTableRef ⇒
+ val role = userAgreementModel.role
+ policyModel.roleMapping.get(role) match {
+ case None ⇒
+ throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
+ role,
+ userAgreementModel.userID,
+ TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreementModel.validFromMillis)
+ )
+
+ case Some(fullPriceTable) ⇒
+ fullPriceTable
+ }
+
+ case AdHocFullPriceTableRef(fullPriceTable) ⇒
+ fullPriceTable
+ }
+ }
+
+ def unsafeFullPriceTableForAgreement(
+ userAgreement: UserAgreementMsg,
+ knownPolicyModel: PolicyModel
+ ): FullPriceTableMsg = {
+
+ val policyModel = knownPolicyModel match {
+ case null ⇒
+ unsafeValidPolicyModelAt(userAgreement.getValidFromMillis)
+
+ case policyModel ⇒
+ policyModel
+ }
+
+ unsafeFullPriceTableForAgreement(userAgreement, policyModel.msg)
+ }
+
+ def unsafeFullPriceTableForAgreement(
+ userAgreement: UserAgreementMsg,
+ knownPolicy: PolicyMsg
+ ): FullPriceTableMsg = {
+ val policy = knownPolicy match {
+ case null ⇒
+ unsafeValidPolicyAt(userAgreement.getValidFromMillis)
+
+ case policy ⇒
+ policy
+ }
+
+ val role = userAgreement.getRole
+ userAgreement.getFullPriceTableRef match {
+ case null ⇒
+ policy.getRoleMapping.get(role) match {
+ case null ⇒
+ throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
+ role,
+ userAgreement.getUserID,
+ TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreement.getValidFromMillis)
+ )
+
+ case fullPriceTable ⇒
+ fullPriceTable
+ }
+
+ case fullPriceTable ⇒
+ fullPriceTable
+ }
+ }
+
/**
* Computes the initial user agreement for the given role and reference time. Also,
* records the ID from a potential related IMEvent.
*
- * @param role The role in the agreement
- * @param referenceTimeMillis The reference time to consider for the agreement
+ * @param imEvent The IMEvent that creates the user
*/
- def initialUserAgreement(
- role: String,
- referenceTimeMillis: Long,
- relatedIMEventID: Option[String]
- ): UserAgreementModel = {
+ def initialUserAgreement(imEvent: IMEventMsg): UserAgreementModel = {
+ require(MessageHelpers.isUserCreationIMEvent(imEvent))
+
+ val role = imEvent.getRole
+ val referenceTimeMillis = imEvent.getOccurredMillis
// Just checking
- assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis))
-
- StdUserAgreement(
- "<StandardUserAgreement>",
- relatedIMEventID,
- 0,
- Long.MaxValue,
- role,
- PolicyDefinedFullPriceTableRef
- )
+ assert(null ne unsafeFullPriceTableModelForRoleAt(role, referenceTimeMillis))
+
+ ModelFactory.newUserAgreementModelFromIMEvent(imEvent)
}
- def initialUserBalance(role: String, referenceTimeMillis: Long): Double = {
+ def initialUserBalance(role: String, referenceTimeMillis: Long): Real = {
// FIXME: Where is the mapping?
- 1000.0
+ Real.Zero
}
- def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = {
+ def chargingBehaviorOf(resourceType: ResourceTypeMsg): ChargingBehavior = {
// A resource type never changes charging behavior. By definition.
- val className = resourceType.chargingBehavior
+ val className = resourceType.getChargingBehaviorClass
_chargingBehaviorMap.get(className) match {
case Some(chargingBehavior) ⇒
chargingBehavior
}
}
- def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel)
+ def defaultPolicyMsg = apply(EnvKeys.defaultPolicyMsg)
def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
EnvKeys.eventBus,
EnvKeys.restService,
EnvKeys.rabbitMQService,
- EnvKeys.storeWatcherService
+ EnvKeys.storeWatcherService,
+ EnvKeys.rabbitMQProducer
)
object EnvKeys {
final val defaultClassLoader: TypedKey[ClassLoader] =
new AquariumEnvKey[ClassLoader]("default.class.loader")
- final val defaultPolicyModel: TypedKey[PolicyModel] =
- new AquariumEnvKey[PolicyModel]("default.policy.model")
+ final val defaultPolicyMsg: TypedKey[PolicyMsg] =
+ new AquariumEnvKey[PolicyMsg]("default.policy.msg")
}
}