From 695c71e20f5a068757525453e76792914b15b2bd Mon Sep 17 00:00:00 2001 From: Christos KK Loverdos Date: Fri, 22 Jun 2012 16:07:56 +0300 Subject: [PATCH] Refactor Aquarium to make it more configurable - No Aquarium singleton any more. - Aquarium is bootstrapped and configured using a builder. - Services are wired a bit differently right now. Needs more testing, in order to reach previous runtime stability. --- pom.xml | 2 +- src/main/resources/aquarium.properties | 28 +- src/main/scala/gr/grnet/aquarium/Aquarium.scala | 608 +++++++------------- .../scala/gr/grnet/aquarium/AquariumAware.scala} | 52 +- .../gr/grnet/aquarium/AquariumAwareSkeleton.scala} | 25 +- .../scala/gr/grnet/aquarium/AquariumBuilder.scala | 415 +++++++++++++ src/main/scala/gr/grnet/aquarium/Main.scala | 20 +- .../scala/gr/grnet/aquarium/ResourceLocator.scala | 29 + .../gr/grnet/aquarium/actor/RoleableActor.scala | 2 +- .../aquarium/actor/service/rest/RESTActor.scala | 15 +- .../aquarium/actor/service/user/UserActor.scala | 9 +- .../computation/UserStateComputations.scala | 18 +- .../computation/state/parts/RoleHistoryItem.scala | 15 +- .../connector/rabbitmq/RabbitMQConsumer.scala | 38 +- .../grnet/aquarium/logic/accounting/Policy.scala | 28 +- .../aquarium/logic/accounting/RoleAgreements.scala | 142 ----- .../gr/grnet/aquarium/service/AkkaService.scala | 9 +- .../grnet/aquarium/service/EventBusService.scala | 24 +- .../grnet/aquarium/service/RESTActorService.scala | 31 +- .../grnet/aquarium/service/RabbitMQService.scala | 23 +- .../service/RoleableActorProviderService.scala | 1 + .../SimpleLocalRoleableActorProviderService.scala | 19 +- .../aquarium/service/StoreWatcherService.scala | 31 +- .../service/event/AquariumCreatedEvent.scala | 45 ++ .../grnet/aquarium/store/ResourceEventStore.scala | 2 +- .../{MemStore.scala => MemStoreProvider.scala} | 6 +- src/test/resources/aquarium.properties | 60 +- .../gr/grnet/aquarium/logic/test/PolicyTest.scala | 114 ---- .../grnet/aquarium/rest/actor/RESTActorTest.scala | 10 +- .../aquarium/user/UserStateComputationsTest.scala | 25 +- 30 files changed, 896 insertions(+), 950 deletions(-) rename src/{test/scala/gr/grnet/aquarium/logic/test/RoleAgreementsTest.scala => main/scala/gr/grnet/aquarium/AquariumAware.scala} (54%) rename src/{test/scala/gr/grnet/aquarium/StoreConfigurator.scala => main/scala/gr/grnet/aquarium/AquariumAwareSkeleton.scala} (69%) create mode 100644 src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala delete mode 100644 src/main/scala/gr/grnet/aquarium/logic/accounting/RoleAgreements.scala create mode 100644 src/main/scala/gr/grnet/aquarium/service/event/AquariumCreatedEvent.scala rename src/main/scala/gr/grnet/aquarium/store/memory/{MemStore.scala => MemStoreProvider.scala} (98%) delete mode 100644 src/test/scala/gr/grnet/aquarium/logic/test/PolicyTest.scala diff --git a/pom.xml b/pom.xml index fc3b07b..2c24817 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,7 @@ 4.0.0 gr.grnet aquarium - 0.2-SNAPSHOT + 0.2.0-SNAPSHOT Aquarium diff --git a/src/main/resources/aquarium.properties b/src/main/resources/aquarium.properties index eab7427..def2921 100644 --- a/src/main/resources/aquarium.properties +++ b/src/main/resources/aquarium.properties @@ -1,13 +1,4 @@ -version = 0.0.2-SNAPSHOT - -# Location of the Aquarium accounting policy config file. If commented -# out, Aquarium will look for the file policy.yaml first at the program -# starting directory and then fall back to the classpath. -aquarium.policy = policy.yaml - -# Location of the file that defines the mappings between -# user roles and agreements -aquarium.role-agreement.map=role-agreement.map +version = 0.2.0-SNAPSHOT ### Queue related settings @@ -27,6 +18,9 @@ rabbitmq.username=guest # Passwd for connecting with the AMQP server rabbitmq.passwd=guest +# Exchnage used by Aquarium to publish messages +rabbitmq.exchange=aquarium + # Virtual host on the AMQP server rabbitmq.vhost=/ @@ -94,19 +88,5 @@ store.provider.class=gr.grnet.aquarium.store.mongodb.MongoDBStoreProvider # Override the user event store (if present, it will not be given by the store provider above) #policy.store.class= -# The lower mark for the UserActors' LRU. -user.actor.LRU.lower.mark=800 -# The upper mark for the UserActors' LRU. -user.actors.LRU.upper.mark=1000 - -# A time period in milliseconds for which we can tolerate stale data regarding user state. -user.state.timestamp.threshold=10000 - -# Exchnage used by Aquarium to publish messages -rabbitmq.exchange=aquarium - -# Save unparsed user events to user event store -ack.unparsed.event.im=false - # Administrative REST API authorization cookie admin.cookie=1 \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/Aquarium.scala b/src/main/scala/gr/grnet/aquarium/Aquarium.scala index 52a3749..68941bf 100644 --- a/src/main/scala/gr/grnet/aquarium/Aquarium.scala +++ b/src/main/scala/gr/grnet/aquarium/Aquarium.scala @@ -35,34 +35,31 @@ package gr.grnet.aquarium -import java.io.File - -import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters} -import com.ckkloverdos.maybe._ +import com.ckkloverdos.env.Env +import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey} import com.ckkloverdos.props.Props -import com.ckkloverdos.sys.SysProp - -import gr.grnet.aquarium.util.{Lifecycle, Loggable, shortNameOfClass, shortClassNameOf} -import gr.grnet.aquarium.store._ -import gr.grnet.aquarium.service._ -import gr.grnet.aquarium.converter.StdConverters +import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, IMEventStore, ResourceEventStore, StoreProvider} +import java.io.File +import gr.grnet.aquarium.util.{Loggable, Lifecycle} +import gr.grnet.aquarium.service.{RoleableActorProviderService, StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService} +import com.ckkloverdos.convert.Converters import java.util.concurrent.atomic.AtomicBoolean -import gr.grnet.aquarium.ResourceLocator._ +import org.slf4j.{LoggerFactory, Logger} +import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler import gr.grnet.aquarium.computation.UserStateComputations -import gr.grnet.aquarium.logic.accounting.algorithm.{SimpleCostPolicyAlgorithmCompiler, CostPolicyAlgorithmCompiler} +import com.ckkloverdos.maybe._ +import gr.grnet.aquarium.ResourceLocator._ import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap import gr.grnet.aquarium.logic.accounting.Policy -import org.slf4j.{Logger, LoggerFactory} +import com.ckkloverdos.sys.SysProp /** - * This is the Aquarium entry point. - * - * Responsible to load all of application configuration and provide the relevant services. * - * @author Christos KK Loverdos . + * @author Christos KK Loverdos */ -final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariumSelf ⇒ - import Aquarium.Keys + +final class Aquarium(env: Env) extends Lifecycle with Loggable { + import Aquarium.EnvKeys private[this] val _isStopping = new AtomicBoolean(false) @@ -91,199 +88,17 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariu getClientLogger(client).warn(fmt.format(args: _*)) } - /** - * Reflectively provide a new instance of a class and configure it appropriately. - */ - private[this] def newInstance[C : Manifest](_className: String = ""): C = { - val className = _className match { - case "" ⇒ - manifest[C].erasure.getName - - case name ⇒ - name - } - - val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C]) - instanceM match { - case Just(instance) ⇒ instance match { - case configurable: Configurable ⇒ - val localProps = configurable.propertyPrefix match { - case Some(prefix) ⇒ - props.subsetForKeyPrefix(prefix) - - case None ⇒ - props - } - - logger.debug("Configuring {} with props", configurable.getClass.getName) - MaybeEither(configurable configure localProps) match { - case Just(_) ⇒ - logger.info("Configured {} with props", configurable.getClass.getName) - instance - - case Failed(e) ⇒ - throw new AquariumInternalError("Could not configure instance of %s".format(className), e) - } - - case _ ⇒ - instance - } - - case Failed(e) ⇒ - throw new AquariumInternalError("Could not instantiate %s".format(className), e) - } - - } - - private[this] lazy val _algorithmCompiler: CostPolicyAlgorithmCompiler = SimpleCostPolicyAlgorithmCompiler - - private[this] lazy val _userStateComputations = new UserStateComputations(aquariumSelf) - - private[this] lazy val _actorProvider = newInstance[RoleableActorProviderService](props(Keys.actor_provider_class)) - - /** - * Initializes a store provider, according to the value configured - * in the configuration file. The - */ - private[this] lazy val _storeProvider = newInstance[StoreProvider](props(Keys.store_provider_class)) - - private[this] lazy val _restService = newInstance[Lifecycle](props(Keys.rest_service_class)) - - private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = { - // If there is a specific `UserStateStore` implementation specified in the - // properties, then this implementation overrides the user store given by - // `StoreProvider`. - props.get(Keys.user_state_store_class) map { className ⇒ - val instance = newInstance[UserStateStore](className) - logger.info("Overriding %s provisioning. Implementation given by: %s".format( - shortNameOfClass(classOf[UserStateStore]), - instance.getClass)) - instance - } - } - - private[this] lazy val _resourceEventStoreM: Maybe[ResourceEventStore] = { - // If there is a specific `EventStore` implementation specified in the - // properties, then this implementation overrides the event store given by - // `StoreProvider`. - props.get(Keys.resource_event_store_class) map { className ⇒ - val instance = newInstance[ResourceEventStore](className) - logger.info("Overriding EventStore provisioning. Implementation given by: %s".format(instance.getClass)) - instance - } - } - - private[this] lazy val _imEventStoreM: Maybe[IMEventStore] = { - props.get(Keys.user_event_store_class) map { className ⇒ - val instance = newInstance[IMEventStore](className) - logger.info("Overriding IMEventStore provisioning. Implementation given by: %s".format(instance.getClass)) - instance - } - } - - private[this] lazy val _policyStoreM: Maybe[PolicyStore] = { - props.get(Keys.policy_store_class) map { - className ⇒ - val instance = newInstance[PolicyStore](className) - logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass)) - instance - } - } - - private[this] lazy val _eventsStoreFolder: Maybe[File] = { - props.get(Keys.events_store_folder) map { - folderName ⇒ - logger.info("{} = {}", Keys.events_store_folder, folderName) - - val canonicalFolder = { - val folder = new File(folderName) - if(folder.isAbsolute) { - folder.getCanonicalFile - } else { - logger.info("{} is not absolute, making it relative to Aquarium Home", Keys.events_store_folder) - new File(ResourceLocator.Homes.Folders.AquariumHome, folderName).getCanonicalFile - } - } - - val canonicalPath = canonicalFolder.getCanonicalPath - - if(canonicalFolder.exists() && !canonicalFolder.isDirectory) { - throw new AquariumInternalError("%s = %s is not a folder".format(Keys.events_store_folder, canonicalFolder)) - } - - // Now, events folder must be outside AQUARIUM_HOME, since AQUARIUM_HOME can be wiped out for an upgrade but - // we still want to keep the events. - val ahCanonicalPath = ResourceLocator.Homes.Folders.AquariumHome.getCanonicalPath - if(canonicalPath.startsWith(ahCanonicalPath)) { - throw new AquariumException( - "%s = %s is under Aquarium Home = %s".format( - Keys.events_store_folder, - canonicalFolder, - ahCanonicalPath - )) - } - - canonicalFolder.mkdirs() - - canonicalFolder + @throws(classOf[AquariumInternalError]) + def apply[T: Manifest](key: TypedKey[T]): T = { + try { + env.getEx(key) + } catch { + case e: Exception ⇒ + throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key)) } } - private[this] lazy val _events_store_save_rc_events = props.getBoolean(Keys.events_store_save_rc_events).getOr(false) - - private[this] lazy val _events_store_save_im_events = props.getBoolean(Keys.events_store_save_im_events).getOr(false) - - private[this] lazy val _converters = StdConverters.AllConverters - - private[this] lazy val _timerService: TimerService = newInstance[SimpleTimerService]() - - private[this] lazy val _akka = newInstance[AkkaService]() - - private[this] lazy val _eventBus = newInstance[EventBusService]() - - private[this] lazy val _rabbitmqService = newInstance[RabbitMQService]() - - private[this] lazy val _storeWatcherService = newInstance[StoreWatcherService]() - - private[this] lazy val _allServices = List( - _timerService, - _akka, - _actorProvider, - _eventBus, - _restService, - _rabbitmqService, - _storeWatcherService - ) - - def get(key: String, default: String = ""): String = props.getOr(key, default) - - def defaultClassLoader = Thread.currentThread().getContextClassLoader - - /** - * FIXME: This must be ditched. - * - * Find a file whose location can be overiden in - * the configuration file (e.g. policy.yaml) - * - * @param name Name of the file to search for - * @param prop Name of the property that defines the file path - * @param default Name to return if no file is found - */ - def findConfigFile(name: String, prop: String, default: String): File = { - // Check for the configured value first - val configured = new File(get(prop)) - if (configured.exists) - return configured - - // Look into the configuration context - ResourceLocator.getResource(name) match { - case Just(policyResource) ⇒ - val path = policyResource.url.getPath - new File(path) - case _ ⇒ - new File(default) - } - } + private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this(_)) private[this] def startServices(): Unit = { for(service ← _allServices) { @@ -303,12 +118,7 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariu } } - def stopWithDelay(millis: Long) { - Thread sleep millis - stop() - } - - private[this] def configure(): Unit = { + private[this] def showBasicConfiguration(): Unit = { logger.info("Aquarium Home = %s".format( if(Homes.Folders.AquariumHome.isAbsolute) Homes.Folders.AquariumHome @@ -317,7 +127,7 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariu )) for(folder ← this.eventsStoreFolder) { - logger.info("{} = {}", Aquarium.Keys.events_store_folder, folder) + logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder) } this.eventsStoreFolder.throwMe // on error @@ -343,59 +153,71 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariu })) } - def start() = { + def start(): Unit = { this._isStopping.set(false) - configure() + showBasicConfiguration() addShutdownHooks() startServices() } - def stop() = { + def stop(): Unit = { this._isStopping.set(true) stopServices() } - def algorithmCompiler = _algorithmCompiler - - def userStateComputations = _userStateComputations - - def converters = _converters - - def actorProvider = _actorProvider - - def eventBus = _eventBus - - def timerService = _timerService - - def userStateStore = { - _userStateStoreM match { - case Just(us) ⇒ us - case _ ⇒ storeProvider.userStateStore - } + /** + * Stops Aquarium after the given millis. Used during testing. + */ + def stopAfterMillis(millis: Long) { + Thread sleep millis + stop() } - def resourceEventStore = { - _resourceEventStoreM match { - case Just(es) ⇒ es - case _ ⇒ storeProvider.resourceEventStore - } - } + /** + * Reflectively provide a new instance of a class and configure it appropriately. + */ + def newInstance[C <: AnyRef](_class: Class[C], className: String): C = { + val originalProps = apply(EnvKeys.originalProps) - def imEventStore = { - _imEventStoreM match { - case Just(es) ⇒ es - case _ ⇒ storeProvider.imEventStore - } - } + val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C]) + instanceM match { + case Just(instance) ⇒ + eventBus.addSubscriber[C](instance) + + instance match { + case configurable: Configurable if (originalProps ne null) ⇒ + val localProps = configurable.propertyPrefix match { + case somePrefix @ Some(prefix) ⇒ + if(prefix.length == 0) { + logger.warn( + "Property prefix for %s is %s. Consider using None".format(instance, somePrefix)) + } + + originalProps.subsetForKeyPrefix(prefix) + + case None ⇒ + originalProps + } + + logger.debug("Configuring {} with props", configurable.getClass.getName) + MaybeEither(configurable configure localProps) match { + case Just(_) ⇒ + logger.info("Configured {} with props", configurable.getClass.getName) + instance + + case Failed(e) ⇒ + throw new AquariumInternalError("Could not configure instance of %s".format(className), e) + } + + case _ ⇒ + instance + } - def policyStore = { - _policyStoreM match { - case Just(es) ⇒ es - case _ ⇒ storeProvider.policyStore + case Failed(e) ⇒ + throw new AquariumInternalError("Could not instantiate %s".format(className), e) } - } - def storeProvider = _storeProvider + } def currentResourcesMap: DSLResourcesMap = { // FIXME: Get rid of this singleton stuff @@ -416,24 +238,42 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariu // FIXME: Read from properties? "default" } - - def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Aquarium = { - val map = this.props.map - val newMap = map.updated(Keys.store_provider_class, spc.getName) - val newProps = new Props(newMap) - new Aquarium(newProps) - } - def eventsStoreFolder = _eventsStoreFolder + def defaultClassLoader = apply(EnvKeys.defaultClassLoader) - def saveResourceEventsToEventsStoreFolder = _events_store_save_rc_events + def resourceEventStore = apply(EnvKeys.resourceEventStore) - def saveIMEventsToEventsStoreFolder = _events_store_save_im_events + def imEventStore = apply(EnvKeys.imEventStore) - def adminCookie: MaybeOption[String] = props.get(Aquarium.Keys.admin_cookie) match { - case just @ Just(_) ⇒ just - case _ ⇒ NoVal - } + def userStateStore = apply(EnvKeys.userStateStore) + + def policyStore = apply(EnvKeys.policyStore) + + def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder) + + def algorithmCompiler = apply(EnvKeys.algorithmCompiler) + + def eventBus = apply(EnvKeys.eventBus) + + def userStateComputations = apply(EnvKeys.userStateComputations) + + def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold) + + def adminCookie = apply(EnvKeys.adminCookie) + + def converters = apply(EnvKeys.converters) + + def actorProvider = apply(EnvKeys.actorProvider) + + def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents) + + def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents) + + def timerService = apply(EnvKeys.timerService) + + def restPort = apply(EnvKeys.restPort) + + def version = apply(EnvKeys.version) } object Aquarium { @@ -451,137 +291,68 @@ object Aquarium { SysProp.FileEncoding ) - implicit val DefaultConverters = TheDefaultConverters - - final val PolicyConfName = ResourceLocator.ResourceNames.POLICY_YAML - - final val RolesAgreementsName = ResourceLocator.ResourceNames.ROLE_AGREEMENTS_MAP - - final lazy val AquariumPropertiesResource = ResourceLocator.Resources.AquariumPropertiesResource - - final lazy val AquariumProperties = { - val maybeProps = Props(AquariumPropertiesResource) - maybeProps match { - case Just(props) ⇒ - props - - case NoVal ⇒ - throw new AquariumInternalError( - "Could not load %s from %s".format( - ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, - AquariumPropertiesResource)) - - - case Failed(e) ⇒ - throw new AquariumInternalError( - "Could not load %s from %s".format( - ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, - AquariumPropertiesResource), - e) - } - } - - /** - * The main [[gr.grnet.aquarium.Aquarium]] instance. - */ - final lazy val Instance = { - Maybe(new Aquarium(AquariumProperties)) match { - case Just(masterConf) ⇒ - masterConf - - case NoVal ⇒ - throw new AquariumInternalError( - "Could not create Aquarium configuration from %s".format( - AquariumPropertiesResource)) - - case Failed(e) ⇒ - throw new AquariumInternalError( - "Could not create Aquarium configuration from %s".format( - AquariumPropertiesResource), - e) - } - } - - /** - * Defines the names of all the known keys inside the master properties file. - */ - final object Keys { + object HTTP { + final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie" + final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase + } + + final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) { + override def toString = name + } + + final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List( + EnvKeys.timerService, + EnvKeys.akkaService, + EnvKeys.actorProvider, + EnvKeys.eventBus, + EnvKeys.restService, + EnvKeys.rabbitMQService, + EnvKeys.storeWatcherService + ) + object EnvKeys { /** * The Aquarium version. Will be reported in any due occasion. */ - final val version = "version" + final val version = StringKey("version") - /** - * The fully qualified name of the class that implements the `RoleableActorProviderService`. - * Will be instantiated reflectively and should have a public default constructor. - */ - final val actor_provider_class = "actor.provider.class" - - /** - * The class that initializes the REST service - */ - final val rest_service_class = "rest.service.class" + final val originalProps: TypedKey[Props] = + new AquariumEnvKey[Props]("originalProps") /** * The fully qualified name of the class that implements the `StoreProvider`. * Will be instantiated reflectively and should have a public default constructor. */ - final val store_provider_class = "store.provider.class" - - /** - * The class that implements the User store - */ - final val user_state_store_class = "user.state.store.class" + final val storeProvider: TypedKey[StoreProvider] = + new AquariumEnvKey[StoreProvider]("store.provider.class") /** - * The class that implements the resource event store - */ - final val resource_event_store_class = "resource.event.store.class" - - /** - * The class that implements the IM event store - */ - final val user_event_store_class = "user.event.store.class" - - /** - * The class that implements the wallet entries store - */ - final val policy_store_class = "policy.store.class" - - - /** The lower mark for the UserActors' LRU. - * - * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation. + * If a value is given to this property, then it represents a folder where all events coming to aquarium are + * saved. * + * This is for debugging purposes. */ - final val user_actors_lru_lower_mark = "user.actors.LRU.lower.mark" + final val eventsStoreFolder: TypedKey[Option[File]] = + new AquariumEnvKey[Option[File]]("events.store.folder") /** - * The upper mark for the UserActors' LRU. + * If this is `true` and `events.store.folder` is defined, then all resource events are + * also stored in `events.store.folder`. * - * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation. + * This is for debugging purposes. */ - final val user_actors_lru_upper_mark = "user.actors.LRU.upper.mark" - /** - * REST service listening port. - * - * Default is 8080. - */ - final val rest_port = "rest.port" + final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events") /** - * Location of the Aquarium accounting policy config file + * If this is `true` and `events.store.folder` is defined, then all IM events are + * also stored in `events.store.folder`. + * + * This is for debugging purposes. */ - final val aquarium_policy = "aquarium.policy" + final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events") /** - * Location of the role-agreement mapping file - */ - final val aquarium_role_agreement_map = "aquarium.role-agreement.map" - - /** * A time period in milliseconds for which we can tolerate stale parts regarding user state. * * The smaller the value, the more accurate the user credits and other state parts are. @@ -590,56 +361,73 @@ object Aquarium { * the timestamp of the last known balance amount by this value, then a re-computation for * the balance is triggered. */ - final val user_state_timestamp_threshold = "user.state.timestamp.threshold" + final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold") /** - * The time unit is the lowest billable time period. - * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine - * seconds, then the user will be billed for ten seconds. + * REST service listening port. * - * This is an overall constant. We use it as a property in order to prepare ourselves for - * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud - * infrastructures. + * Default is 8080. */ - final val time_unit_in_millis = "time.unit.in.seconds" + final val restPort = IntKey("rest.port") /** - * If a value is given to this property, then it represents a folder where all events coming to aquarium are - * saved. + * A cookie used in every administrative REST API call, so that Aquarium knows it comes from + * an authorised client. */ - final val events_store_folder = "events.store.folder" + final val adminCookie: TypedKey[Option[String]] = + new AquariumEnvKey[Option[String]]("admin.cookie") - /** - * If this is `true` and `events.store.folder` is defined, then all resource events are - * also stored in `events.store.folder`. - * - * This is for debugging purposes. - */ - final val events_store_save_rc_events = "events.store.save.rc.events" + final val resourceEventStore: TypedKey[ResourceEventStore] = + new AquariumEnvKey[ResourceEventStore]("resource.event.store.class") - /** - * If this is `true` and `events.store.folder` is defined, then all IM events are - * also stored in `events.store.folder`. - * - * This is for debugging purposes. - */ - final val events_store_save_im_events = "events.store.save.im.events" + final val imEventStore: TypedKey[IMEventStore] = + new AquariumEnvKey[IMEventStore]("im.event.store.class") + + final val userStateStore: TypedKey[UserStateStore] = + new AquariumEnvKey[UserStateStore]("user.state.store.class") + + final val policyStore: TypedKey[PolicyStore] = + new AquariumEnvKey[PolicyStore]("policy.store.class") /** - * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.event.model.im.IMEventModel]] is - * saved to the [[gr.grnet.aquarium.store.IMEventStore]]. + * The class that initializes the REST service */ - final val save_unparsed_event_im = "save.unparsed.event.im" + final val restService: TypedKey[Lifecycle] = + new AquariumEnvKey[Lifecycle]("rest.service.class") /** - * A cookie used in every administrative REST API call, so that Aquarium knows it comes from - * an authorised client. + * The fully qualified name of the class that implements the `RoleableActorProviderService`. + * Will be instantiated reflectively and should have a public default constructor. */ - final val admin_cookie = "admin.cookie" - } + final val actorProvider: TypedKey[RoleableActorProviderService] = + new AquariumEnvKey[RoleableActorProviderService]("actor.provider.class") + + final val akkaService: TypedKey[AkkaService] = + new AquariumEnvKey[AkkaService]("akka.service") + + final val eventBus: TypedKey[EventBusService] = + new AquariumEnvKey[EventBusService]("event.bus.service") + + final val timerService: TypedKey[TimerService] = + new AquariumEnvKey[TimerService]("timer.service") + + final val rabbitMQService: TypedKey[RabbitMQService] = + new AquariumEnvKey[RabbitMQService]("rabbitmq.service") + + final val storeWatcherService: TypedKey[StoreWatcherService] = + new AquariumEnvKey[StoreWatcherService]("store.watcher.service") + + final val converters: TypedKey[Converters] = + new AquariumEnvKey[Converters]("converters") + + final val algorithmCompiler: TypedKey[CostPolicyAlgorithmCompiler] = + new AquariumEnvKey[CostPolicyAlgorithmCompiler]("algorithm.compiler") + + final val userStateComputations: TypedKey[UserStateComputations] = + new AquariumEnvKey[UserStateComputations]("user.state.computations") + + final val defaultClassLoader: TypedKey[ClassLoader] = + new AquariumEnvKey[ClassLoader]("default.class.loader") - object HTTP { - final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie" - final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase } } diff --git a/src/test/scala/gr/grnet/aquarium/logic/test/RoleAgreementsTest.scala b/src/main/scala/gr/grnet/aquarium/AquariumAware.scala similarity index 54% rename from src/test/scala/gr/grnet/aquarium/logic/test/RoleAgreementsTest.scala rename to src/main/scala/gr/grnet/aquarium/AquariumAware.scala index a96702f..cf67021 100644 --- a/src/test/scala/gr/grnet/aquarium/logic/test/RoleAgreementsTest.scala +++ b/src/main/scala/gr/grnet/aquarium/AquariumAware.scala @@ -33,57 +33,15 @@ * or implied, of GRNET S.A. */ -package gr.grnet.aquarium.logic.test - -import org.junit.Assert._ -import org.junit.{Test} -import io.Source -import gr.grnet.aquarium.util.TestMethods -import gr.grnet.aquarium.logic.accounting.{Policy, RoleAgreements} +package gr.grnet.aquarium +import gr.grnet.aquarium.service.event.AquariumCreatedEvent /** - * Tests for the [[gr.grnet.aquarium.logic.accounting.RoleAgreements]] class * - * @author Georgios Gousios + * @author Christos KK Loverdos */ -class RoleAgreementsTest extends TestMethods { - - @Test - def testParseMappings { - - var a = """ - - # Some useless comment -student=foobar # This should be ignored (no default policy) -prof=default - name=default -%asd=default # This should be ignored (non accepted char) -*=default - """ - - var src = Source.fromBytes(a.getBytes()) - var output = RoleAgreements.parseMappings(src) - - assertEquals(3, output.size) - assertEquals("default", output.getOrElse("prof",null).name) - - // No default value - a = """ - prof=default - """ - src = Source.fromBytes(a.getBytes()) - assertThrows[RuntimeException](RoleAgreements.parseMappings(src)) - } - - @Test - def testLoadMappings { - // Uses the roles-agreements.map file in test/resources - RoleAgreements.loadMappings - - assertEquals("default", RoleAgreements.agreementForRole("student").name) - // Check that default policies are applied - assertEquals("default", RoleAgreements.agreementForRole("foobar").name) - } +trait AquariumAware { + def awareOfAquariumEx(event: AquariumCreatedEvent): Unit } diff --git a/src/test/scala/gr/grnet/aquarium/StoreConfigurator.scala b/src/main/scala/gr/grnet/aquarium/AquariumAwareSkeleton.scala similarity index 69% rename from src/test/scala/gr/grnet/aquarium/StoreConfigurator.scala rename to src/main/scala/gr/grnet/aquarium/AquariumAwareSkeleton.scala index 95cf7a4..ee0b827 100644 --- a/src/test/scala/gr/grnet/aquarium/StoreConfigurator.scala +++ b/src/main/scala/gr/grnet/aquarium/AquariumAwareSkeleton.scala @@ -35,24 +35,21 @@ package gr.grnet.aquarium -import store.memory.MemStore -import store.mongodb.MongoDBStoreProvider -import util.Loggable +import gr.grnet.aquarium.service.event.AquariumCreatedEvent +import com.google.common.eventbus.Subscribe /** - * Returns the appropriate Configurator implementation depending on value - * of the test.store runtime parameter. * - * @author Georgios Gousios + * @author Christos KK Loverdos */ -trait StoreConfigurator extends Loggable { - def configurator: Aquarium = - LogicTestsAssumptions.propertyValue(PropertyNames.TestStore) match { - case "mem" => Aquarium.Instance.withStoreProviderClass(classOf[MemStore]) - case "mongo" => Aquarium.Instance.withStoreProviderClass(classOf[MongoDBStoreProvider]) - case _ => - logger.warn("Unknown store type, defaulting to \"mem\"") - Aquarium.Instance.withStoreProviderClass(classOf[MemStore]) +trait AquariumAwareSkeleton extends AquariumAware { + @volatile protected var _aquarium: Aquarium = null + + final def aquarium = _aquarium + + @Subscribe + def awareOfAquariumEx(event: AquariumCreatedEvent) = { + this._aquarium = event.aquarium } } diff --git a/src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala b/src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala new file mode 100644 index 0000000..d135648 --- /dev/null +++ b/src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala @@ -0,0 +1,415 @@ +/* + * Copyright 2011-2012 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed + * or implied, of GRNET S.A. + */ + +package gr.grnet.aquarium + +import com.ckkloverdos.key.{BooleanKey, TypedKey} +import com.ckkloverdos.env.Env +import com.ckkloverdos.props.Props +import com.ckkloverdos.maybe.{MaybeOption, Failed, MaybeEither, Just, NoVal} +import gr.grnet.aquarium.util.Loggable +import java.io.File +import gr.grnet.aquarium.store.StoreProvider +import gr.grnet.aquarium.logic.accounting.algorithm.SimpleCostPolicyAlgorithmCompiler +import gr.grnet.aquarium.computation.UserStateComputations +import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, AkkaService, SimpleTimerService, EventBusService} +import gr.grnet.aquarium.converter.StdConverters +import gr.grnet.aquarium.service.event.AquariumCreatedEvent + +/** + * Create a tailored Aquarium. + * + * Thread-unsafe. + * + * @author Christos KK Loverdos + */ + +final class AquariumBuilder(val originalProps: Props) extends Loggable { + if(originalProps eq null) { + throw new AquariumInternalError("props is null") + } + + import Aquarium.EnvKeys + + private[this] var _env = Env() + // This is special + private[this] val eventBus = new EventBusService + + @volatile + private[this] var _aquarium: Aquarium = _ + + @throws(classOf[AquariumInternalError]) + private def propsGetEx(key: String): String = { + try { + originalProps.getEx(key) + } catch { + case e: Exception ⇒ + throw new AquariumInternalError("Could not locate %s in Aquarium properties".format(key)) + } + } + + @throws(classOf[AquariumInternalError]) + private def envGetEx[T: Manifest](key: TypedKey[T]): T = { + try { + _env.getEx(key) + } catch { + case e: Exception ⇒ + throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key)) + } + } + + def update[T: Manifest](keyvalue: (TypedKey[T], T)): this.type = { + assert(keyvalue ne null, "keyvalue ne null") + + _env += keyvalue + this + } + + def update[T : Manifest](key: TypedKey[T], value: T): this.type = { + assert(key ne null, "key ne null") + + this update (key -> value) + } + + /** + * Reflectively provide a new instance of a class and configure it appropriately. + */ + private[this] def newInstance[C <: AnyRef](manifest: Manifest[C], className: String): C = { + val defaultClassLoader = Thread.currentThread().getContextClassLoader + val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C]) + instanceM match { + case Just(instance) ⇒ + eventBus.addSubscriber(instance) + + instance match { + case configurable: Configurable if (originalProps ne null) ⇒ + val localProps = configurable.propertyPrefix match { + case somePrefix @ Some(prefix) ⇒ + if(prefix.length == 0) { + logger.warn( + "Property prefix for %s is %s. Consider using None".format(instance, somePrefix)) + } + + originalProps.subsetForKeyPrefix(prefix) + + case None ⇒ + originalProps + } + + logger.debug("Configuring {} with props", configurable.getClass.getName) + MaybeEither(configurable configure localProps) match { + case Just(_) ⇒ + logger.info("Configured {} with props", configurable.getClass.getName) + instance + + case Failed(e) ⇒ + throw new AquariumInternalError("Could not configure instance of %s".format(className), e) + } + + case _ ⇒ + instance + } + + case Failed(e) ⇒ + throw new AquariumInternalError("Could not instantiate %s".format(className), e) + } + } + + private[this] def checkStoreProviderOverride: Unit = { + val envKey = EnvKeys.storeProvider + if(_env.contains(envKey)) { + return + } + + if(originalProps eq null) { + throw new AquariumInternalError("Cannot locate store provider, since no properties have been defined") + } + + val propName = envKey.name + originalProps.get(propName) match { + case Just(propValue) ⇒ + update(envKey, newInstance(envKey.keyType, propValue)) + + case NoVal ⇒ + throw new AquariumInternalError("No store provider is given in properties") + + case Failed(e) ⇒ + throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName)) + } + } + + private[this] def checkStoreOverrides: Unit = { + if(originalProps eq null) { + return + } + + def checkOverride[S <: AnyRef : Manifest](envKey: TypedKey[S], f: StoreProvider ⇒ S): Unit = { + if(!_env.contains(envKey)) { + val propName = envKey.name + + originalProps.get(propName) match { + case Just(propValue) ⇒ + // Create the store reflectively + update(envKey, newInstance(envKey.keyType, propValue)) + + case NoVal ⇒ + // Get the store from the store provider + val storeProvider = this.envGetEx(EnvKeys.storeProvider) + val propValue = f(storeProvider) + update(envKey, propValue) + + case Failed(e) ⇒ + throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName)) + } + } + } + + // If a store has not been specifically overridden, we load it from the properties + checkOverride(EnvKeys.resourceEventStore, _.resourceEventStore) + checkOverride(EnvKeys.imEventStore, _.imEventStore) + checkOverride(EnvKeys.userStateStore, _.userStateStore) + checkOverride(EnvKeys.policyStore, _.policyStore) + } + + private[this] def checkEventsStoreFolderOverride: Unit = { + val propName = EnvKeys.eventsStoreFolder.name + + _env.get(EnvKeys.eventsStoreFolder) match { + case Just(storeFolderOption) ⇒ + // Some value has been set, even a None, so do nothing more + logger.info("{} = {}", propName, storeFolderOption) + + case Failed(e) ⇒ + throw new AquariumInternalError(e, "While obtaining value for env key %s".format(propName)) + + case NoVal ⇒ + if(originalProps eq null) { + update(EnvKeys.eventsStoreFolder, None) + return + } + + // load from props + for(folderName ← originalProps.get(propName)) { + logger.info("{} = {}", propName, folderName) + + update(EnvKeys.eventsStoreFolder, Some(new File(folderName))) + } + + } + } + + private[this] def checkEventsStoreFolderExistence: Unit = { + val propName = EnvKeys.eventsStoreFolder.name + for(folder ← this.envGetEx(EnvKeys.eventsStoreFolder)) { + val canonicalFolder = { + if(folder.isAbsolute) { + folder.getCanonicalFile + } else { + logger.info("{} is not absolute, making it relative to Aquarium Home", propName) + new File(ResourceLocator.Homes.Folders.AquariumHome, folder.getPath).getCanonicalFile + } + } + + val canonicalPath = canonicalFolder.getCanonicalPath + + if(canonicalFolder.exists() && !canonicalFolder.isDirectory) { + throw new AquariumInternalError("%s = %s is not a folder".format(propName, canonicalFolder)) + } + + // Now, events folder must be outside AQUARIUM_HOME, since AQUARIUM_HOME can be wiped out for an upgrade but + // we still want to keep the events. + val ahCanonicalPath = ResourceLocator.Homes.Folders.AquariumHome.getCanonicalPath + if(canonicalPath.startsWith(ahCanonicalPath)) { + throw new AquariumInternalError( + "%s = %s is under Aquarium Home = %s".format( + propName, + canonicalFolder, + ahCanonicalPath + )) + } + + canonicalFolder.mkdirs() + + update(EnvKeys.eventsStoreFolder, Some(canonicalFolder)) + } + } + + private[this] def checkEventsStoreFolderVariablesOverrides: Unit = { + def checkVar(envKey: BooleanKey): Unit = { + if(!_env.contains(envKey)) { + val propName = envKey.name + originalProps.getBoolean(propName) match { + case Just(propValue) ⇒ + update(envKey, propValue) + + case NoVal ⇒ + update(envKey, false) + + case Failed(e) ⇒ + throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName)) + } + } + } + + checkVar(EnvKeys.eventsStoreSaveRCEvents) + checkVar(EnvKeys.eventsStoreSaveIMEvents) + } + + private[this] def checkRestServiceOverride: Unit = { + checkNoPropsOverride(EnvKeys.restService) { envKey ⇒ + val envKey = EnvKeys.restService + val propName = envKey.name + val propValue = propsGetEx(propName) + + newInstance(envKey.keyType, propValue) + } + } + + private[this] def checkNoPropsOverride[T: Manifest](envKey: TypedKey[T])(f: TypedKey[T] ⇒ T): Unit = { + if(_env.contains(envKey)) { + return + } + + update(envKey, f(envKey)) + } + + private[this] def checkPropsOverride[T: Manifest](envKey: TypedKey[T])(f: (TypedKey[T], String) ⇒ T): Unit = { + if(_env.contains(envKey)) { + return + } + + val propName = envKey.name + originalProps.get(propName) match { + case Just(propValue) ⇒ + update(envKey, f(envKey, propValue)) + + case NoVal ⇒ + throw new AquariumInternalError("No value for key %s in properties".format(propName)) + + case Failed(e) ⇒ + throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName)) + } + } + + private[this] def checkOptionalPropsOverride[T: Manifest] + (envKey: TypedKey[Option[T]]) + (f: (TypedKey[Option[T]], String) ⇒ Option[T]): Unit = { + + if(_env.contains(envKey)) { + return + } + + val propName = envKey.name + originalProps.get(propName) match { + case Just(propValue) ⇒ + update(envKey, f(envKey, propValue)) + + case NoVal ⇒ + update(envKey, None) + + case Failed(e) ⇒ + throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName)) + } + } + + def build(): Aquarium = { + if(this._aquarium ne null) { + return this._aquarium + } + + checkPropsOverride(EnvKeys.version) { (envKey, propValue) ⇒ propValue } + + checkNoPropsOverride(EnvKeys.eventBus) { _ ⇒ eventBus } + + checkNoPropsOverride(EnvKeys.originalProps) { _ ⇒ originalProps } + + checkNoPropsOverride(EnvKeys.defaultClassLoader) { _ ⇒ Thread.currentThread().getContextClassLoader } + + checkNoPropsOverride(EnvKeys.converters) { _ ⇒ StdConverters.AllConverters } + + checkStoreProviderOverride + checkStoreOverrides + + checkEventsStoreFolderOverride + checkEventsStoreFolderExistence + checkEventsStoreFolderVariablesOverrides + + checkRestServiceOverride + + checkNoPropsOverride(EnvKeys.timerService) { envKey ⇒ + newInstance(envKey.keyType, classOf[SimpleTimerService].getName) + } + + checkNoPropsOverride(EnvKeys.algorithmCompiler) { _ ⇒ SimpleCostPolicyAlgorithmCompiler } + + checkNoPropsOverride(EnvKeys.userStateComputations) { envKey ⇒ + newInstance(envKey.keyType, classOf[UserStateComputations].getName) + } + + checkNoPropsOverride(EnvKeys.akkaService) { envKey ⇒ + newInstance(envKey.keyType, classOf[AkkaService].getName) + } + + checkNoPropsOverride(EnvKeys.rabbitMQService) { envKey ⇒ + newInstance(envKey.keyType, classOf[RabbitMQService].getName) + } + + checkNoPropsOverride(EnvKeys.storeWatcherService) { envKey ⇒ + newInstance(envKey.keyType, classOf[StoreWatcherService].getName) + } + + checkPropsOverride(EnvKeys.actorProvider) { (envKey, propValue) ⇒ + newInstance(envKey.keyType, propValue) + } + + checkPropsOverride(EnvKeys.userStateTimestampThreshold) { (envKey, propValue) ⇒ + propValue.toLong + } + + checkPropsOverride(EnvKeys.restPort) { (envKey, propValue) ⇒ + propValue.toInt + } + + checkOptionalPropsOverride(EnvKeys.adminCookie) { (envKey, propValue) ⇒ + Some(propValue) + } + + this._aquarium = new Aquarium(_env) + + this._aquarium.eventBus.syncPost(AquariumCreatedEvent(this._aquarium)) + + this._aquarium + } +} diff --git a/src/main/scala/gr/grnet/aquarium/Main.scala b/src/main/scala/gr/grnet/aquarium/Main.scala index eae67a8..bfb52e7 100644 --- a/src/main/scala/gr/grnet/aquarium/Main.scala +++ b/src/main/scala/gr/grnet/aquarium/Main.scala @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory import ch.qos.logback.classic.LoggerContext import ch.qos.logback.classic.joran.JoranConfigurator import com.ckkloverdos.maybe.Just +import gr.grnet.aquarium.service.event.AquariumCreatedEvent /** * Main method for Aquarium @@ -63,19 +64,6 @@ object Main extends LazyLoggable { } } - def doStart(): Unit = { - import ResourceLocator.SysEnvs - - // We have AKKA builtin, so no need to mess with pre-existing installation. - if(SysEnvs.AKKA_HOME.value.isJust) { - val error = new AquariumInternalError("%s is set. Please unset and restart Aquarium".format(SysEnvs.Names.AKKA_HOME)) - logger.error("%s is set".format(SysEnvs.Names.AKKA_HOME), error) - throw error - } - - Aquarium.Instance.start() - } - def main(args: Array[String]) = { configureLogging() @@ -83,9 +71,11 @@ object Main extends LazyLoggable { logStarting("Aquarium") val ms0 = TimeHelpers.nowMillis() try { - doStart() + val aquarium = new AquariumBuilder(ResourceLocator.AquariumProperties).build() + aquarium.start() + val ms1 = TimeHelpers.nowMillis() - logStarted(ms0, ms1, "Aquarium") + logStarted(ms0, ms1, "Aquarium [%s]", aquarium.version) logSeparator() } catch { case e: Throwable ⇒ diff --git a/src/main/scala/gr/grnet/aquarium/ResourceLocator.scala b/src/main/scala/gr/grnet/aquarium/ResourceLocator.scala index 4f1f028..4f4bb44 100644 --- a/src/main/scala/gr/grnet/aquarium/ResourceLocator.scala +++ b/src/main/scala/gr/grnet/aquarium/ResourceLocator.scala @@ -42,6 +42,11 @@ import java.io.File import gr.grnet.aquarium.util.justForSure import gr.grnet.aquarium.util.isRunningTests import com.ckkloverdos.resource.{FileStreamResource, StreamResource, CompositeStreamResourceContext, ClassLoaderStreamResourceContext, FileStreamResourceContext} +import com.ckkloverdos.props.Props +import com.ckkloverdos.convert.Converters._ +import com.ckkloverdos.maybe.Just +import com.ckkloverdos.maybe.Failed +import com.ckkloverdos.convert.Converters /** * Locates resources. @@ -267,6 +272,30 @@ object ResourceLocator { } } + final lazy val AquariumProperties = { + implicit val DefaultConverters = Converters.DefaultConverters + val maybeProps = Props(Resources.AquariumPropertiesResource) + maybeProps match { + case Just(props) ⇒ + props + + case NoVal ⇒ + throw new AquariumInternalError( + "Could not load %s from %s".format( + ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, + Resources.AquariumPropertiesResource)) + + + case Failed(e) ⇒ + throw new AquariumInternalError( + "Could not load %s from %s".format( + ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, + Resources.AquariumPropertiesResource), + e) + } + } + + def getResource(what: String): Maybe[StreamResource] = { ResourceContexts.MasterResourceContext.getResource(what) } diff --git a/src/main/scala/gr/grnet/aquarium/actor/RoleableActor.scala b/src/main/scala/gr/grnet/aquarium/actor/RoleableActor.scala index 5e1b41c..eb92f67 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/RoleableActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/RoleableActor.scala @@ -45,7 +45,7 @@ import util.{Loggable, shortClassNameOf} * * @author Christos KK Loverdos . */ -trait RoleableActor extends Actor with Loggable { +trait RoleableActor extends Actor with Loggable with AquariumAwareSkeleton { def role: ActorRole override def toString = "%s@%s(%s)".format(shortClassNameOf(this), System.identityHashCode(this), role.role) diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala index deb14b8..d730281 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala @@ -46,13 +46,12 @@ import gr.grnet.aquarium.actor.{RESTRole, RoleableActor, RouterRole} import RESTPaths._ import gr.grnet.aquarium.util.date.TimeHelpers import org.joda.time.format.ISODateTimeFormat -import gr.grnet.aquarium.actor.message.admin.PingAllRequest -import gr.grnet.aquarium.actor.message.{RouterResponseMessage, GetUserStateRequest, RouterRequestMessage, ActorMessage, GetUserBalanceRequest} +import gr.grnet.aquarium.actor.message.{RouterResponseMessage, GetUserStateRequest, RouterRequestMessage, GetUserBalanceRequest} import gr.grnet.aquarium.{ResourceLocator, Aquarium} import com.ckkloverdos.resource.StreamResource -import com.ckkloverdos.maybe.{Failed, NoVal, Just} +import com.ckkloverdos.maybe.Failed import java.net.InetAddress -import gr.grnet.aquarium.event.model.{ExternalEventModel, EventModel} +import gr.grnet.aquarium.event.model.ExternalEventModel /** * Spray-based REST service. This is the outer-world's interface to Aquarium functionality. @@ -67,9 +66,6 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable { final val TEXT_PLAIN = "text/plain" final val APPLICATION_JSON = "application/json" - - private[this] def aquarium = Aquarium.Instance - private def stringResponse(status: Int, stringBody: String, contentType: String): HttpResponse = { HttpResponse( status, @@ -124,7 +120,7 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable { )( f: RequestResponder ⇒ Unit): Unit = { aquarium.adminCookie match { - case Just(adminCookie) ⇒ + case Some(adminCookie) ⇒ headers.find(_.name.toLowerCase == Aquarium.HTTP.RESTAdminHeaderNameLowerCase) match { case Some(cookieHeader) if(cookieHeader.value == adminCookie) ⇒ try f(responder) @@ -143,7 +139,7 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable { responder.complete(stringResponse(401, "Unauthorized!", TEXT_PLAIN)) } - case NoVal ⇒ + case None ⇒ responder.complete(stringResponse(403, "Forbidden!", TEXT_PLAIN)) } } @@ -250,7 +246,6 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable { private[this] def callRouter(message: RouterRequestMessage, responder: RequestResponder): Unit = { - val aquarium = Aquarium.Instance val actorProvider = aquarium.actorProvider val router = actorProvider.actorForRole(RouterRole) val futureResponse = router ask message diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index 0f20383..7ee1c7a 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -46,12 +46,11 @@ import gr.grnet.aquarium.util.date.TimeHelpers import gr.grnet.aquarium.event.model.im.IMEventModel import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest} import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf, shortNameOfClass, shortNameOfType} -import gr.grnet.aquarium.computation.reason.{RealtimeBillingCalculation, InitialUserActorSetup, UserStateChangeReason, IMEventArrival, InitialUserStateSetup} -import gr.grnet.aquarium.{AquariumInternalError, Aquarium} +import gr.grnet.aquarium.computation.reason.{RealtimeBillingCalculation, InitialUserActorSetup, UserStateChangeReason, IMEventArrival} +import gr.grnet.aquarium.AquariumInternalError import gr.grnet.aquarium.computation.state.parts.IMStateSnapshot import gr.grnet.aquarium.computation.BillingMonthInfo import gr.grnet.aquarium.computation.state.{UserStateBootstrap, UserState} -import gr.grnet.aquarium.event.model.resource.ResourceEventModel /** * @@ -83,8 +82,6 @@ class UserActor extends ReflectiveRoleableActor { def role = UserActorRole - private[this] def aquarium: Aquarium = Aquarium.Instance - private[this] def userStateComputations = aquarium.userStateComputations private[this] def stdUserStateStoreFunc = (userState: UserState) ⇒ { @@ -92,7 +89,7 @@ class UserActor extends ReflectiveRoleableActor { } private[this] def _timestampTheshold = { - aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(1000L * 60 * 5 /* 5 minutes */) + aquarium.userStateTimestampThreshold } private[this] def haveUserState = { diff --git a/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala b/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala index 4c7bf28..0ee314d 100644 --- a/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala +++ b/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala @@ -42,24 +42,22 @@ import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap import gr.grnet.aquarium.computation.state.parts._ import gr.grnet.aquarium.event.model.NewWalletEntry import gr.grnet.aquarium.event.model.resource.ResourceEventModel -import gr.grnet.aquarium.{Aquarium, AquariumInternalError} +import gr.grnet.aquarium.{AquariumAwareSkeleton, Aquarium, AquariumAware, AquariumInternalError} import gr.grnet.aquarium.computation.reason.{MonthlyBillingCalculation, InitialUserStateSetup, UserStateChangeReason} import gr.grnet.aquarium.computation.state.{UserStateWorker, UserStateBootstrap, UserState} +import gr.grnet.aquarium.service.event.AquariumCreatedEvent +import com.google.common.eventbus.Subscribe /** * * @author Christos KK Loverdos */ -final class UserStateComputations(_aquarium: => Aquarium) extends Loggable { - - lazy val aquarium = _aquarium - - lazy val storeProvider = aquarium.storeProvider - lazy val timeslotComputations = new TimeslotComputations {} +final class UserStateComputations extends AquariumAwareSkeleton with Loggable { + lazy val timeslotComputations = new TimeslotComputations {} // FIXME lazy val algorithmCompiler = aquarium.algorithmCompiler - lazy val policyStore = storeProvider.policyStore - lazy val userStateStoreForRead = storeProvider.userStateStore - lazy val resourceEventStore = storeProvider.resourceEventStore + lazy val policyStore = aquarium.policyStore + lazy val userStateStoreForRead = aquarium.userStateStore + lazy val resourceEventStore = aquarium.resourceEventStore def findUserStateAtEndOfBillingMonth( userStateBootstrap: UserStateBootstrap, diff --git a/src/main/scala/gr/grnet/aquarium/computation/state/parts/RoleHistoryItem.scala b/src/main/scala/gr/grnet/aquarium/computation/state/parts/RoleHistoryItem.scala index eed9cde..59d2344 100644 --- a/src/main/scala/gr/grnet/aquarium/computation/state/parts/RoleHistoryItem.scala +++ b/src/main/scala/gr/grnet/aquarium/computation/state/parts/RoleHistoryItem.scala @@ -60,18 +60,9 @@ case class RoleHistoryItem( */ validTo: Long = Long.MaxValue ) { - - try { - require( - validFrom <= validTo, - "validFrom(%s) <= validTo(%s)".format(new MutableDateCalc(validFrom), new MutableDateCalc(validTo))) - } - catch { - case e: IllegalArgumentException ⇒ - // TODO Remove this - Aquarium.Instance.debug(this, "!! validFrom = %s, validTo = %s, dx=%s", validFrom, validTo, validTo-validFrom) - throw e - } + require( + validFrom <= validTo, + "validFrom(%s) <= validTo(%s)".format(new MutableDateCalc(validFrom), new MutableDateCalc(validTo))) require(name ne null, "Name is not null") diff --git a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala index 76eb837..80c1413 100644 --- a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala +++ b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala @@ -54,22 +54,24 @@ import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.{RabbitMQConKeys, * @author Christos KK Loverdos */ -class RabbitMQConsumer(val conf: RabbitMQConsumerConf, - - /** - * Specifies what we do with the message payload. - */ - handler: PayloadHandler, - - /** - * Specifies how we execute the handler - */ - executor: PayloadHandlerExecutor, - - /** - * After the payload is processed, we call this function with ourselves and the result. - */ - notifier: (RabbitMQConsumer, Maybe[HandlerResult]) ⇒ Unit +class RabbitMQConsumer( + val aquarium: Aquarium, + val conf: RabbitMQConsumerConf, + + /** + * Specifies what we do with the message payload. + */ + handler: PayloadHandler, + + /** + * Specifies how we execute the handler + */ + executor: PayloadHandlerExecutor, + + /** + * After the payload is processed, we call this function with ourselves and the result. + */ + notifier: (RabbitMQConsumer, Maybe[HandlerResult]) ⇒ Unit ) extends Loggable with Lifecycle { consumerSelf ⇒ private[this] var _factory: ConnectionFactory = _ @@ -126,7 +128,7 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf, case object LifecycleStartReason extends StartReason case object PingStartReason extends StartReason - private[this] def timerService = Aquarium.Instance.timerService + private[this] def timerService = aquarium.timerService private[this] lazy val servers = { conf.connectionConf(RabbitMQConKeys.servers) @@ -261,8 +263,6 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf, safeStop() } - private[this] def aquarium = Aquarium.Instance - private[this] def postBusError(event: BusEvent): Unit = { aquarium.eventBus ! event } diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/Policy.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/Policy.scala index be6f4c6..deee38f 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/accounting/Policy.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/accounting/Policy.scala @@ -36,23 +36,21 @@ package gr.grnet.aquarium.logic.accounting import dsl.{Timeslot, DSLPolicy, DSL} -import gr.grnet.aquarium.Aquarium._ import java.io.{InputStream, FileInputStream, File} import java.util.Date import gr.grnet.aquarium.util.Loggable import java.util.concurrent.atomic.AtomicReference -import gr.grnet.aquarium.Aquarium.Keys -import com.ckkloverdos.maybe.{Failed, NoVal, Just} import collection.immutable.{TreeMap, SortedMap} -import gr.grnet.aquarium.{AquariumException, Aquarium} +import gr.grnet.aquarium.{ResourceLocator, AquariumAwareSkeleton, AquariumException, Aquarium} import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers} +import com.ckkloverdos.maybe.{Failed, Just} /** * Searches for and loads the applicable accounting policy * * @author Georgios Gousios */ -object Policy extends DSL with Loggable { +object Policy extends DSL with Loggable with AquariumAwareSkeleton { /* Pointer to the latest policy */ private[logic] lazy val policies = { @@ -163,7 +161,7 @@ object Policy extends DSL with Loggable { */ private[logic] def reloadPolicies: SortedMap[Timeslot, DSLPolicy] = if (config == null) - reloadPolicies(Instance) + reloadPolicies(aquarium) else reloadPolicies(config) @@ -174,24 +172,24 @@ object Policy extends DSL with Loggable { //2. Check whether policy file has been updated val latestPolicyChange = if (pol.isEmpty) 0 else pol.last.validFrom - val policyf = Instance.findConfigFile(PolicyConfName, Keys.aquarium_policy, PolicyConfName) + + val policyf = ResourceLocator.Resources.PolicyYAMLResource var updated = false if (policyf.exists) { - if (policyf.lastModified > latestPolicyChange) { - logger.info("Policy file updated since last check, reloading") updated = true - } else { - logger.info("Policy file not changed since last check") - } - } else { + } else { logger.warn("User specified policy file %s does not exist, " + - "using stored policy information".format(policyf.getAbsolutePath)) + "using stored policy information".format(policyf.url)) } if (updated) { val ts = TimeHelpers.nowMillis() - val parsedNew = loadPolicyFromFile(policyf) + val parsedNewM = policyf.mapInputStream(parse).toMaybeEither + val parsedNew = parsedNewM match { + case Just(parsedNew) ⇒ parsedNew + case Failed(e) ⇒ throw e + } val newPolicy = parsedNew.toPolicyEntry config.policyStore.findPolicyEntry(newPolicy.id) match { diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/RoleAgreements.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/RoleAgreements.scala deleted file mode 100644 index ea26311..0000000 --- a/src/main/scala/gr/grnet/aquarium/logic/accounting/RoleAgreements.scala +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright 2011-2012 GRNET S.A. All rights reserved. - * - * Redistribution and use in source and binary forms, with or - * without modification, are permitted provided that the following - * conditions are met: - * - * 1. Redistributions of source code must retain the above - * copyright notice, this list of conditions and the following - * disclaimer. - * - * 2. Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials - * provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS - * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR - * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR - * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF - * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED - * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN - * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and - * documentation are those of the authors and should not be - * interpreted as representing official policies, either expressed - * or implied, of GRNET S.A. - */ - -package gr.grnet.aquarium.logic.accounting - -import dsl.DSLAgreement -import gr.grnet.aquarium.util.Loggable -import gr.grnet.aquarium.Aquarium -import gr.grnet.aquarium.Aquarium.{Instance, Keys} -import io.Source -import java.io.{InputStream, File} -import java.util.regex.Pattern - -/** - * Encapsulates mappings from user roles to Aquarium policies. The mappings - * are used during new user registration to automatically set a policy to - * a user according to its role. - * - * The configuration is read from a configuration file pointed to by the - * main Aquarium configuration file. The - * - * @author Georgios Gousios - */ -object RoleAgreements extends Loggable { - - private var mappings: Map[String, DSLAgreement] = loadMappings - - /** - * Returns the agreement that matches the provided role. The search for a - * matching agreement is done with the current version of the Aquarium - * policy. - */ - def agreementForRole(role : String) = mappings.get(role.toLowerCase) match { - case Some(x) => x - case None => mappings.get("*").getOrElse( - throw new RuntimeException("Cannot find agreement for default role *")) - } - - /** - * Trigger reloading of the mappings file. - */ - def reloadMappings = mappings = loadMappings - - /** - * Load and parse the mappings file - */ - private[logic] def loadMappings = synchronized { - val config = Aquarium.Instance.get(Keys.aquarium_role_agreement_map) - val configFile = Aquarium.Instance.findConfigFile( - Aquarium.RolesAgreementsName, Keys.aquarium_role_agreement_map, - Aquarium.RolesAgreementsName) - - def loadFromClasspath: Source = { - getClass.getClassLoader.getResourceAsStream(Aquarium.RolesAgreementsName) match { - case x: InputStream => - logger.warn("Using default role to agreement mappings, this is " + - "problably not what you want") - Source.fromInputStream(x) - case null => - logger.error("No valid role to agreement mappings configuration found, " + - "Aquarium will fail") - null - } - } - - val source = if (configFile.exists && configFile.isFile) { - if (configFile.isFile) - Source.fromFile(configFile) - else { - logger.warn(("Configured file %s is a directory. " + - "Trying the default one.").format(config)) - loadFromClasspath - } - } else { - logger.warn("Configured file %s for role-agreement mappings cannot " + - "be found. Trying the default one.".format(config)) - loadFromClasspath - } - - parseMappings(source) - } - - def parseMappings(src: Source) = { - val p = Pattern.compile("^\\s*([\\*a-zA-Z0-9-_]+)\\s*=\\s*([a-zA-Z0-9-_]+).*$") - - val mappings = src.getLines.foldLeft(Map[String, DSLAgreement]()) { - (acc, l) => - l match { - case x if (x.matches("^\\s*$")) => acc - case x if (x.matches("^\\s*\\#")) => acc - case x if (p.matcher(x).find()) => - // Ugly code warning - val m = p.matcher(x) - m.find() - val role = m.group(1) - val agrName = m.group(2) - Policy.policy.findAgreement(agrName) match { - case Some(x) => acc ++ Map(role -> x) - case None => - logger.warn("No agreement with name %s".format(agrName)) - acc - } - case _ => acc - } - } - if (!mappings.keysIterator.contains("*")) - throw new RuntimeException("Cannot find agreement for default role *") - mappings - } -} diff --git a/src/main/scala/gr/grnet/aquarium/service/AkkaService.scala b/src/main/scala/gr/grnet/aquarium/service/AkkaService.scala index e6eafce..a7b0f42 100644 --- a/src/main/scala/gr/grnet/aquarium/service/AkkaService.scala +++ b/src/main/scala/gr/grnet/aquarium/service/AkkaService.scala @@ -37,7 +37,8 @@ package gr.grnet.aquarium.service import akka.actor.Actor import gr.grnet.aquarium.util.{Loggable, Lifecycle} -import gr.grnet.aquarium.util.date.TimeHelpers +import gr.grnet.aquarium.ResourceLocator.SysEnvs +import gr.grnet.aquarium.AquariumInternalError /** * A wrapper around Akka, so that it is uniformly treated as an Aquarium service. @@ -47,6 +48,12 @@ import gr.grnet.aquarium.util.date.TimeHelpers final class AkkaService extends Lifecycle with Loggable { def start() = { + // We have AKKA builtin, so no need to mess with pre-existing installation. + if(SysEnvs.AKKA_HOME.value.isJust) { + val error = new AquariumInternalError("%s is set. Please unset and restart Aquarium".format(SysEnvs.Names.AKKA_HOME)) + logger.error("%s is set".format(SysEnvs.Names.AKKA_HOME), error) + throw error + } } def stop()= { diff --git a/src/main/scala/gr/grnet/aquarium/service/EventBusService.scala b/src/main/scala/gr/grnet/aquarium/service/EventBusService.scala index 1753cce..54f7d03 100644 --- a/src/main/scala/gr/grnet/aquarium/service/EventBusService.scala +++ b/src/main/scala/gr/grnet/aquarium/service/EventBusService.scala @@ -38,7 +38,7 @@ package gr.grnet.aquarium.service import gr.grnet.aquarium.Configurable import com.ckkloverdos.props.Props import gr.grnet.aquarium.service.event.BusEvent -import com.google.common.eventbus.{AsyncEventBus, DeadEvent, Subscribe} +import com.google.common.eventbus.{EventBus, AsyncEventBus, DeadEvent, Subscribe} import gr.grnet.aquarium.util.{DaemonThreadFactory, Lifecycle, Loggable} import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.Collections @@ -51,11 +51,14 @@ import java.util.Collections */ class EventBusService extends Loggable with Lifecycle with Configurable { + private[this] val className = classOf[EventBusService].getName private[this] val asyncBus = new AsyncEventBus( - classOf[EventBusService].getName, + "%s/async".format(className), Executors.newFixedThreadPool(1, new DaemonThreadFactory) ) + private[this] val syncBus = new EventBus("%s/sync") + private[this] val subscribers = Collections.newSetFromMap[AnyRef](new ConcurrentHashMap()) def propertyPrefix = None @@ -75,16 +78,23 @@ class EventBusService extends Loggable with Lifecycle with Configurable { def stop() = synchronized { val iterator = subscribers.iterator() while(iterator.hasNext) { - asyncBus.unregister(iterator.next()) + val subscriber = iterator.next() + asyncBus.unregister(subscriber) + syncBus.unregister(subscriber) } subscribers.clear() } - @inline - def post[A <: BusEvent](event: A): Unit = { - this ! event + /** + * Posts an event synchronously. + */ + def syncPost[A <: BusEvent](event: A): Unit = { + syncBus.post(event) } + /** + * Post an event asynchronously. + */ def ![A <: BusEvent](event: A): Unit = { asyncBus.post(event) } @@ -92,11 +102,13 @@ class EventBusService extends Loggable with Lifecycle with Configurable { def removeSubscriber[A <: AnyRef](subscriber: A): Unit = synchronized { subscribers.remove(subscriber) asyncBus.unregister(subscriber) + syncBus.register(subscriber) } def addSubscriber[A <: AnyRef](subscriber: A): Unit = synchronized { subscribers.add(subscriber) asyncBus.register(subscriber) + syncBus.register(subscriber) } @Subscribe diff --git a/src/main/scala/gr/grnet/aquarium/service/RESTActorService.scala b/src/main/scala/gr/grnet/aquarium/service/RESTActorService.scala index 40aefd6..09ce11f 100644 --- a/src/main/scala/gr/grnet/aquarium/service/RESTActorService.scala +++ b/src/main/scala/gr/grnet/aquarium/service/RESTActorService.scala @@ -39,26 +39,35 @@ import gr.grnet.aquarium.actor.RESTRole import _root_.akka.actor._ import cc.spray.can.{ServerConfig, HttpClient, HttpServer} import gr.grnet.aquarium.util.{Loggable, Lifecycle} -import gr.grnet.aquarium.{AquariumInternalError, Aquarium} +import gr.grnet.aquarium.{Configurable, AquariumAwareSkeleton, Aquarium} +import com.ckkloverdos.props.Props /** * REST service based on Actors and Spray. * * @author Christos KK Loverdos . */ -class RESTActorService extends Lifecycle with Loggable { +class RESTActorService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable { private[this] var _port: Int = 8080 private[this] var _restActor: ActorRef = _ private[this] var _serverActor: ActorRef = _ private[this] var _clientActor: ActorRef = _ - def start(): Unit = { - val aquarium = Aquarium.Instance - this._port = aquarium.props.getInt(Aquarium.Keys.rest_port).getOr( - throw new AquariumInternalError( - "%s was not specified in Aquarium properties".format(Aquarium.Keys.rest_port))) - logger.debug("Starting on port %s".format(this._port)) + def propertyPrefix = Some(RESTActorService.Prefix) + + /** + * Configure this instance with the provided properties. + * + * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix. + */ + def configure(props: Props) { + this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name) + logger.debug("HTTP port is %s".format(this._port)) + } + + def start(): Unit = { + logger.info("Starting HTTP on port %s".format(this._port)) this._restActor = aquarium.actorProvider.actorForRole(RESTRole) // Start Spray subsystem @@ -67,7 +76,13 @@ class RESTActorService extends Lifecycle with Loggable { } def stop(): Unit = { + logger.info("Stopping HTTP on port %s".format(this._port)) + this._serverActor.stop() this._clientActor.stop() } +} + +object RESTActorService { + final val Prefix = "rest" } \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala b/src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala index c1e42c3..87992ce 100644 --- a/src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala +++ b/src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala @@ -37,13 +37,13 @@ package gr.grnet.aquarium.service import com.ckkloverdos.props.Props import com.google.common.eventbus.Subscribe -import gr.grnet.aquarium.{Aquarium, Configurable} +import gr.grnet.aquarium.{AquariumAwareSkeleton, Aquarium, Configurable} import gr.grnet.aquarium.converter.StdConverters import gr.grnet.aquarium.actor.RouterRole import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle} import gr.grnet.aquarium.util.sameTags -import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent} +import gr.grnet.aquarium.service.event.{AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent} import gr.grnet.aquarium.connector.rabbitmq.service.{PayloadHandlerFutureExecutor, PayloadHandlerPostNotifier} import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys @@ -54,14 +54,12 @@ import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, R * @author Christos KK Loverdos */ -class RabbitMQService extends Loggable with Lifecycle with Configurable { +class RabbitMQService extends Loggable with Lifecycle with Configurable with AquariumAwareSkeleton { @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters) @volatile private[this] var _consumers = List[RabbitMQConsumer]() def propertyPrefix = Some(RabbitMQKeys.PropertiesPrefix) - def aquarium = Aquarium.Instance - def eventBus = aquarium.eventBus def resourceEventStore = aquarium.resourceEventStore @@ -79,11 +77,18 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable { */ def configure(props: Props) = { this._props = props + } + + @Subscribe + override def awareOfAquariumEx(event: AquariumCreatedEvent) { + super.awareOfAquariumEx(event) - doConfigure() + aquarium.eventBus.addSubscriber(this) + + doSetup() } - private[this] def doConfigure(): Unit = { + private[this] def doSetup(): Unit = { val postNotifier = new PayloadHandlerPostNotifier(logger) val rcHandler = new ResourceEventPayloadHandler(aquarium, logger) @@ -133,6 +138,7 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable { rccc.queueName )) new RabbitMQConsumer( + aquarium, rccc, rcHandler, futureExecutor, @@ -148,6 +154,7 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable { imcc.queueName )) new RabbitMQConsumer( + aquarium, imcc, imHandler, futureExecutor, @@ -164,8 +171,6 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable { } def start() = { - aquarium.eventBus.addSubscriber(this) - safeStart() } diff --git a/src/main/scala/gr/grnet/aquarium/service/RoleableActorProviderService.scala b/src/main/scala/gr/grnet/aquarium/service/RoleableActorProviderService.scala index e092965..5d1556a 100644 --- a/src/main/scala/gr/grnet/aquarium/service/RoleableActorProviderService.scala +++ b/src/main/scala/gr/grnet/aquarium/service/RoleableActorProviderService.scala @@ -39,6 +39,7 @@ import akka.actor.ActorRef import com.ckkloverdos.props.Props import gr.grnet.aquarium.util.Lifecycle import gr.grnet.aquarium.actor.ActorRole +import gr.grnet.aquarium.AquariumAware /** * diff --git a/src/main/scala/gr/grnet/aquarium/service/SimpleLocalRoleableActorProviderService.scala b/src/main/scala/gr/grnet/aquarium/service/SimpleLocalRoleableActorProviderService.scala index 4bf7440..6b1f6b8 100644 --- a/src/main/scala/gr/grnet/aquarium/service/SimpleLocalRoleableActorProviderService.scala +++ b/src/main/scala/gr/grnet/aquarium/service/SimpleLocalRoleableActorProviderService.scala @@ -37,22 +37,25 @@ package gr.grnet.aquarium.service import com.ckkloverdos.props.Props import akka.actor.ActorRef -import gr.grnet.aquarium.Configurable +import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable} import java.util.concurrent.ConcurrentHashMap import gr.grnet.aquarium.util.Loggable -import gr.grnet.aquarium.util.date.TimeHelpers import gr.grnet.aquarium.actor.message.config.{AquariumPropertiesLoaded, ActorProviderConfigured} import gr.grnet.aquarium.actor._ - /** * All actors are provided locally. * * @author Christos KK Loverdos . */ -class SimpleLocalRoleableActorProviderService extends RoleableActorProviderService with Configurable with Loggable { +class SimpleLocalRoleableActorProviderService + extends RoleableActorProviderService + with AquariumAwareSkeleton + with Configurable + with Loggable { + private[this] val actorCache = new ConcurrentHashMap[ActorRole, ActorRef] - private[this] var _props: Props = _ + @volatile private[this] var _props: Props = _ def propertyPrefix = None @@ -73,7 +76,11 @@ class SimpleLocalRoleableActorProviderService extends RoleableActorProviderServi } private[this] def _newActor(role: ActorRole): ActorRef = { - val actorRef = akka.actor.Actor.actorOf(role.actorType).start() + val actorFactory = (_class: Class[_ <: RoleableActor]) ⇒ { + aquarium.newInstance(_class, _class.getName) + } + + val actorRef = akka.actor.Actor.actorOf(actorFactory(role.actorType)).start() val propsMsg = AquariumPropertiesLoaded(this._props) if(role.canHandleConfigurationMessage(propsMsg)) { diff --git a/src/main/scala/gr/grnet/aquarium/service/StoreWatcherService.scala b/src/main/scala/gr/grnet/aquarium/service/StoreWatcherService.scala index d689795..271a409 100644 --- a/src/main/scala/gr/grnet/aquarium/service/StoreWatcherService.scala +++ b/src/main/scala/gr/grnet/aquarium/service/StoreWatcherService.scala @@ -38,7 +38,7 @@ package gr.grnet.aquarium.service import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle, Tag} import java.util.concurrent.atomic.AtomicBoolean import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent} -import gr.grnet.aquarium.{Configurable, Aquarium} +import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, Aquarium} import com.ckkloverdos.props.Props import gr.grnet.aquarium.store.StoreProvider @@ -48,7 +48,7 @@ import gr.grnet.aquarium.store.StoreProvider * @author Christos KK Loverdos */ -final class StoreWatcherService extends Lifecycle with Configurable with Loggable { +final class StoreWatcherService extends Lifecycle with Configurable with AquariumAwareSkeleton with Loggable { private[this] var _reconnectPeriodMillis = 1000L private[this] val _pingIsScheduled = new AtomicBoolean(false) @@ -56,9 +56,6 @@ final class StoreWatcherService extends Lifecycle with Configurable with Loggabl private[this] val _rcIsAlive = new AtomicBoolean(true) private[this] val _imIsAlive = new AtomicBoolean(true) - def aquarium = Aquarium.Instance - - def propertyPrefix = Some(StoreProvider.Prefix) /** @@ -70,10 +67,13 @@ final class StoreWatcherService extends Lifecycle with Configurable with Loggabl this._reconnectPeriodMillis = props.getLongEx(StoreProvider.Keys.reconnect_period_millis) } - private[this] def safePingStore(tag: Tag, - pinger: () ⇒ Any, - getStatus: () ⇒ Boolean, - setStatus: (Boolean) ⇒ Any): Unit = { + private[this] def safePingStore( + tag: Tag, + pinger: () ⇒ Any, + getStatus: () ⇒ Boolean, + setStatus: (Boolean) ⇒ Any + ): Unit = { + try { val wasAlive = getStatus() pinger() @@ -92,11 +92,14 @@ final class StoreWatcherService extends Lifecycle with Configurable with Loggabl } } - private[this] def doSchedulePing(tag: Tag, - info: String, - pinger: () ⇒ Any, - getStatus: () ⇒ Boolean, - setStatus: (Boolean) ⇒ Any): Unit = { + private[this] def doSchedulePing( + tag: Tag, + info: String, + pinger: () ⇒ Any, + getStatus: () ⇒ Boolean, + setStatus: (Boolean) ⇒ Any + ): Unit = { + aquarium.timerService.scheduleOnce( info, { diff --git a/src/main/scala/gr/grnet/aquarium/service/event/AquariumCreatedEvent.scala b/src/main/scala/gr/grnet/aquarium/service/event/AquariumCreatedEvent.scala new file mode 100644 index 0000000..fe0fc39 --- /dev/null +++ b/src/main/scala/gr/grnet/aquarium/service/event/AquariumCreatedEvent.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2011-2012 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed + * or implied, of GRNET S.A. + */ + +package gr.grnet.aquarium.service.event + +import gr.grnet.aquarium.Aquarium + +/** + * + * @author Christos KK Loverdos + */ + +final case class AquariumCreatedEvent(aquarium: Aquarium) extends BusEvent diff --git a/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala b/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala index 99f6d2f..72f851f 100644 --- a/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala @@ -50,7 +50,7 @@ trait ResourceEventStore { def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent def clearResourceEvents(): Unit = { - // This method is implemented only in MemStore. + // This method is implemented only in MemStoreProvider. throw new AquariumInternalError("Unsupported operation") } diff --git a/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala b/src/main/scala/gr/grnet/aquarium/store/memory/MemStoreProvider.scala similarity index 98% rename from src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala rename to src/main/scala/gr/grnet/aquarium/store/memory/MemStoreProvider.scala index 5744876..22a621a 100644 --- a/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/memory/MemStoreProvider.scala @@ -59,7 +59,7 @@ import gr.grnet.aquarium.computation.BillingMonthInfo * @author Georgios Gousios */ -class MemStore extends UserStateStore +class MemStoreProvider extends UserStateStore with Configurable with PolicyStore with ResourceEventStore with IMEventStore with StoreProvider { @@ -87,7 +87,7 @@ class MemStore extends UserStateStore "PolicyEntry" -> _policyEntries.size ) - "MemStore(%s)" format map + "MemStoreProvider(%s)" format map } //+ StoreProvider @@ -278,7 +278,7 @@ class MemStore extends UserStateStore } } -object MemStore { +object MemStoreProvider { final def isLocalIMEvent(event: IMEventModel) = event match { case _: MemIMEvent ⇒ true case _ ⇒ false diff --git a/src/test/resources/aquarium.properties b/src/test/resources/aquarium.properties index c1f730a..def2921 100644 --- a/src/test/resources/aquarium.properties +++ b/src/test/resources/aquarium.properties @@ -1,13 +1,4 @@ -version = 0.0.2-SNAPSHOT - -# Location of the Aquarium accounting policy config file. If commented -# out, Aquarium will look for the file policy.yaml first at the program -# starting directory and then fall back to the classpath. -aquarium.policy=policy.yaml - -# Location of the file that defines the mappings between -# user roles and agreements -aquarium.role-agreement.map=role-agreement.map +version = 0.2.0-SNAPSHOT ### Queue related settings @@ -18,8 +9,7 @@ rabbitmq.reconnect.period.millis=1000 # active-active mode. rabbitmq.servers=localhost -# Comma separated list of rabbitmq servers to use. The servers must be in an -# active-active mode. +# Port for connecting to the AMQP server rabbitmq.port=5672 # User name for connecting with the AMQP server @@ -28,6 +18,9 @@ rabbitmq.username=guest # Passwd for connecting with the AMQP server rabbitmq.passwd=guest +# Exchnage used by Aquarium to publish messages +rabbitmq.exchange=aquarium + # Virtual host on the AMQP server rabbitmq.vhost=/ @@ -65,13 +58,14 @@ mongodb.dbschema=aquarium mongodb.connection.pool.size=20 # Relative to AQUARIUM_HOME or an absolute path -events.store.folder=../events-store +# DO NOT set this in production +#events.store.folder=../events-store # Store resource events to events.store.folder as well -events.store.save.rc.events=true +events.store.save.rc.events=false # Store IM events to events.store.folder as well -events.store.save.im.events=true +events.store.save.im.events=false # How often do we attempt a reconnection to the store(s)? anystore.reconnect.period.millis=1000 @@ -82,43 +76,17 @@ anystore.reconnect.period.millis=1000 # Actor subsystem actor.provider.class=gr.grnet.aquarium.service.SimpleLocalRoleableActorProviderService - # Class that initializes the REST service rest.service.class=gr.grnet.aquarium.service.RESTActorService - # Store subsystem -store.provider.class=gr.grnet.aquarium.store.mongodb.MemStore - +store.provider.class=gr.grnet.aquarium.store.mongodb.MongoDBStoreProvider # Override the user store (if present, it will not be given by the store provider above) -user.state.store.class=gr.grnet.aquarium.store.memory.MemStore - -# Override the event store (if present, it will not be given by the store provider above) -resource.event.store.class=gr.grnet.aquarium.store.memory.MemStore - +#user.state.store.class=gr.grnet.aquarium.store.memory.MemStorede the event store (if present, it will not be given by the store provider above) +#resource.event.store.class= # Override the user event store (if present, it will not be given by the store provider above) -user.event.store.class=gr.grnet.aquarium.store.memory.MemStore - +#user.event.store.class= # Override the user event store (if present, it will not be given by the store provider above) -policy.store.class=gr.grnet.aquarium.store.memory.MemStore - -# The lower mark for the UserActors' LRU. -user.actor.LRU.lower.mark=800 - -# The upper mark for the UserActors' LRU. -user.actors.LRU.upper.mark=1000 - -# A time period in milliseconds for which we can tolerate stale data regarding user state. -user.state.timestamp.threshold=10000 - -# Comma separated list of exchanges known to aquarium -rabbitmq.exchange=aquarium - -# This is an absolute constant for the lifetime of an Aquarium installation. -# 1 means that every second counts -time.unit.in.seconds = 1 - -# Save unparsed user events to user event store -ack.unparsed.event.im=false +#policy.store.class= # Administrative REST API authorization cookie admin.cookie=1 \ No newline at end of file diff --git a/src/test/scala/gr/grnet/aquarium/logic/test/PolicyTest.scala b/src/test/scala/gr/grnet/aquarium/logic/test/PolicyTest.scala deleted file mode 100644 index 533b376..0000000 --- a/src/test/scala/gr/grnet/aquarium/logic/test/PolicyTest.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright 2011-2012 GRNET S.A. All rights reserved. - * - * Redistribution and use in source and binary forms, with or - * without modification, are permitted provided that the following - * conditions are met: - * - * 1. Redistributions of source code must retain the above - * copyright notice, this list of conditions and the following - * disclaimer. - * - * 2. Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials - * provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS - * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR - * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR - * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF - * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED - * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN - * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and - * documentation are those of the authors and should not be - * interpreted as representing official policies, either expressed - * or implied, of GRNET S.A. - */ - -package gr.grnet.aquarium.logic.test - -import org.junit.Test -import org.junit.Assert._ -import gr.grnet.aquarium.{StoreConfigurator} -import gr.grnet.aquarium.util.date.TimeHelpers -import gr.grnet.aquarium.logic.accounting.Policy -import java.io.File - -/** - * Tests for the Policy resolution algorithms - * - * @author Georgios Gousios - */ -class PolicyTest extends DSLTestBase with StoreConfigurator { - - @Test - def testReloadPolicies: Unit = { - - def copyModifyFile(from: String, to: String) = { - val extra = " - agreement:\n overrides: default\n name: foobar" - val out = new java.io.BufferedWriter(new java.io.FileWriter(to) ); - io.Source.fromFile(from).getLines.map(x => x + "\n").foreach(s => out.write(s,0,s.length)); - out.write(extra) - out.close() - } - - //Initial policy file, read from class path - Policy.withConfigurator(configurator) - val pol = Policy.policies.get - - /*val f = Policy.policyFile - assertTrue(f.exists) - - //Touch the file to trigger reloading with non changed state - Thread.sleep(200) - f.setLastModified(TimeHelpers.nowMillis) - var polNew = Policy.reloadPolicies - - assertEquals(pol.keys.size, polNew.keys.size) - //assertEquals(pol.keys.head, polNew.keys.head) - - //Copy the file and add a new element -> new policy - val fileCopy = new File(f.getParent, "policy.yaml.old") - f.renameTo(fileCopy) - copyModifyFile(fileCopy.getAbsolutePath, - (new File(fileCopy.getParent, "policy.yaml")).getAbsolutePath) - - polNew = Policy.reloadPolicies - assertEquals(pol.keys.size + 1, polNew.keys.size) - val policyEffectivities = Policy.policies.get.keySet.toList.sortWith((x,y) => if (y.from after x.from) true else false) - testSuccessiveTimeslots(policyEffectivities) - testNoGaps(policyEffectivities) - */ - } - - @Test - def testLoadStore: Unit = { - before - - val policies = configurator.policyStore - policies.storePolicyEntry(this.dsl.toPolicyEntry) - - val copy1 = this.dsl.copy(algorithms = List()) - policies.storePolicyEntry(copy1.toPolicyEntry) - - val copy2 = this.dsl.copy(pricelists = List()) - policies.storePolicyEntry(copy2.toPolicyEntry) - - var pol = policies.loadPolicyEntriesAfter(TimeHelpers.nowMillis()) - assert(pol.isEmpty) - - pol = policies.loadPolicyEntriesAfter(0) - assertEquals(3, pol.size) - assertEquals(pol.head.policyYAML, this.dsl.toYAML) - assertEquals(pol.tail.head.policyYAML, copy1.toYAML) - assertEquals(pol.tail.tail.head.policyYAML, copy2.toYAML) - } -} diff --git a/src/test/scala/gr/grnet/aquarium/rest/actor/RESTActorTest.scala b/src/test/scala/gr/grnet/aquarium/rest/actor/RESTActorTest.scala index e70373a..68b4732 100644 --- a/src/test/scala/gr/grnet/aquarium/rest/actor/RESTActorTest.scala +++ b/src/test/scala/gr/grnet/aquarium/rest/actor/RESTActorTest.scala @@ -46,7 +46,7 @@ import cc.spray.can.{HttpResponse, HttpHeader, HttpRequest} import gr.grnet.aquarium.util.makeString import gr.grnet.aquarium.converter.StdConverters import net.liftweb.json.JsonAST.{JValue, JInt} -import gr.grnet.aquarium.{AquariumException, LogicTestsAssumptions, Aquarium} +import gr.grnet.aquarium.{ResourceLocator, AquariumBuilder, AquariumException, LogicTestsAssumptions, Aquarium} /** * @@ -58,10 +58,10 @@ class RESTActorTest { assumeTrue(LogicTestsAssumptions.EnableSprayTests) // Initialize configuration subsystem - val aquarium = Aquarium.Instance + val aquarium = new AquariumBuilder(ResourceLocator.AquariumProperties).build() aquarium.start() - val port = aquarium.props.getInt(Aquarium.Keys.rest_port).getOr( - throw new AquariumException("No %s specified in aquarium properties".format(Aquarium.Keys.rest_port))) + + val port = aquarium.restPort val dialog = SprayHttpDialog("localhost", port) val pingReq = HttpRequest(method = GET, uri = "/ping", headers = HttpHeader("Content-Type", "text/plain; charset=UTF-8")::Nil) @@ -84,6 +84,6 @@ class RESTActorTest { } } - aquarium.stopWithDelay(1000) + aquarium.stopAfterMillis(1000) } } \ No newline at end of file diff --git a/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala b/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala index 7d9c28c..098783e 100644 --- a/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala +++ b/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala @@ -35,7 +35,7 @@ package gr.grnet.aquarium.user -import gr.grnet.aquarium.store.memory.MemStore +import gr.grnet.aquarium.store.memory.MemStoreProvider import gr.grnet.aquarium.logic.accounting.dsl._ import gr.grnet.aquarium.logic.accounting.Policy import gr.grnet.aquarium.util.{Loggable, ContextualLogger} @@ -43,8 +43,7 @@ import gr.grnet.aquarium.simulation._ import gr.grnet.aquarium.uid.{UIDGenerator, ConcurrentVMLocalUIDGenerator} import org.junit.{Assert, Ignore, Test} import gr.grnet.aquarium.logic.accounting.algorithm.{ExecutableCostPolicyAlgorithm, CostPolicyAlgorithmCompiler} -import gr.grnet.aquarium.AquariumException -import gr.grnet.aquarium.Aquarium.{Instance ⇒ AquariumInstance} +import gr.grnet.aquarium.{Aquarium, ResourceLocator, AquariumBuilder, AquariumException} import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, MonthlyBillingCalculation} import gr.grnet.aquarium.util.date.MutableDateCalc import gr.grnet.aquarium.computation.BillingMonthInfo @@ -128,10 +127,13 @@ aquariumpolicy: DiskspacePriceUnit ) - val aquarium = AquariumInstance.withStoreProviderClass(classOf[MemStore]) - Policy.withConfigurator(aquarium) - val StoreProvider = aquarium.storeProvider - val ResourceEventStore = StoreProvider.resourceEventStore + val aquarium = new AquariumBuilder(ResourceLocator.AquariumProperties). + update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider). + build() + + Policy.withConfigurator(aquarium) // FIXME + + val ResourceEventStore = aquarium.resourceEventStore val Computations = aquarium.userStateComputations @@ -236,12 +238,13 @@ aquariumpolicy: val policyOccurredMillis = policyDateCalc.toMillis val policyValidFromMillis = policyDateCalc.copy.goPreviousYear.toMillis val policyValidToMillis = policyDateCalc.copy.goNextYear.toMillis - StoreProvider.policyStore.storePolicyEntry(DefaultPolicy.toPolicyEntry(policyOccurredMillis, policyValidFromMillis, policyValidToMillis)) + aquarium.policyStore.storePolicyEntry(DefaultPolicy.toPolicyEntry(policyOccurredMillis, policyValidFromMillis, + policyValidToMillis)) - val Aquarium = AquariumSim(List(VMTimeResourceSim, DiskspaceResourceSim, BandwidthResourceSim), StoreProvider.resourceEventStore) - val DefaultResourcesMap = Aquarium.resourcesMap + val AquariumSim_ = AquariumSim(List(VMTimeResourceSim, DiskspaceResourceSim, BandwidthResourceSim), aquarium.resourceEventStore) + val DefaultResourcesMap = AquariumSim_.resourcesMap - val UserCKKL = Aquarium.newUser("CKKL", UserCreationDate) + val UserCKKL = AquariumSim_.newUser("CKKL", UserCreationDate) // val InitialUserState = UserState.createInitialUserState( // userID = UserCKKL.userID, -- 1.7.10.4