- No Aquarium singleton any more.
- Aquarium is bootstrapped and configured using a builder.
- Services are wired a bit differently right now. Needs more testing, in
order to reach previous runtime stability.
<modelVersion>4.0.0</modelVersion>
<groupId>gr.grnet</groupId>
<artifactId>aquarium</artifactId>
- <version>0.2-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<!-- Project details-->
<name>Aquarium</name>
-version = 0.0.2-SNAPSHOT
-
-# Location of the Aquarium accounting policy config file. If commented
-# out, Aquarium will look for the file policy.yaml first at the program
-# starting directory and then fall back to the classpath.
-aquarium.policy = policy.yaml
-
-# Location of the file that defines the mappings between
-# user roles and agreements
-aquarium.role-agreement.map=role-agreement.map
+version = 0.2.0-SNAPSHOT
### Queue related settings
# Passwd for connecting with the AMQP server
rabbitmq.passwd=guest
+# Exchnage used by Aquarium to publish messages
+rabbitmq.exchange=aquarium
+
# Virtual host on the AMQP server
rabbitmq.vhost=/
# Override the user event store (if present, it will not be given by the store provider above)
#policy.store.class=
-# The lower mark for the UserActors' LRU.
-user.actor.LRU.lower.mark=800
-# The upper mark for the UserActors' LRU.
-user.actors.LRU.upper.mark=1000
-
-# A time period in milliseconds for which we can tolerate stale data regarding user state.
-user.state.timestamp.threshold=10000
-
-# Exchnage used by Aquarium to publish messages
-rabbitmq.exchange=aquarium
-
-# Save unparsed user events to user event store
-ack.unparsed.event.im=false
-
# Administrative REST API authorization cookie
admin.cookie=1
\ No newline at end of file
package gr.grnet.aquarium
-import java.io.File
-
-import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
-import com.ckkloverdos.maybe._
+import com.ckkloverdos.env.Env
+import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
import com.ckkloverdos.props.Props
-import com.ckkloverdos.sys.SysProp
-
-import gr.grnet.aquarium.util.{Lifecycle, Loggable, shortNameOfClass, shortClassNameOf}
-import gr.grnet.aquarium.store._
-import gr.grnet.aquarium.service._
-import gr.grnet.aquarium.converter.StdConverters
+import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, IMEventStore, ResourceEventStore, StoreProvider}
+import java.io.File
+import gr.grnet.aquarium.util.{Loggable, Lifecycle}
+import gr.grnet.aquarium.service.{RoleableActorProviderService, StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
+import com.ckkloverdos.convert.Converters
import java.util.concurrent.atomic.AtomicBoolean
-import gr.grnet.aquarium.ResourceLocator._
+import org.slf4j.{LoggerFactory, Logger}
+import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
import gr.grnet.aquarium.computation.UserStateComputations
-import gr.grnet.aquarium.logic.accounting.algorithm.{SimpleCostPolicyAlgorithmCompiler, CostPolicyAlgorithmCompiler}
+import com.ckkloverdos.maybe._
+import gr.grnet.aquarium.ResourceLocator._
import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
import gr.grnet.aquarium.logic.accounting.Policy
-import org.slf4j.{Logger, LoggerFactory}
+import com.ckkloverdos.sys.SysProp
/**
- * This is the Aquarium entry point.
- *
- * Responsible to load all of application configuration and provide the relevant services.
*
- * @author Christos KK Loverdos <loverdos@gmail.com>.
+ * @author Christos KK Loverdos <loverdos@gmail.com>
*/
-final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariumSelf ⇒
- import Aquarium.Keys
+
+final class Aquarium(env: Env) extends Lifecycle with Loggable {
+ import Aquarium.EnvKeys
private[this] val _isStopping = new AtomicBoolean(false)
getClientLogger(client).warn(fmt.format(args: _*))
}
- /**
- * Reflectively provide a new instance of a class and configure it appropriately.
- */
- private[this] def newInstance[C : Manifest](_className: String = ""): C = {
- val className = _className match {
- case "" ⇒
- manifest[C].erasure.getName
-
- case name ⇒
- name
- }
-
- val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
- instanceM match {
- case Just(instance) ⇒ instance match {
- case configurable: Configurable ⇒
- val localProps = configurable.propertyPrefix match {
- case Some(prefix) ⇒
- props.subsetForKeyPrefix(prefix)
-
- case None ⇒
- props
- }
-
- logger.debug("Configuring {} with props", configurable.getClass.getName)
- MaybeEither(configurable configure localProps) match {
- case Just(_) ⇒
- logger.info("Configured {} with props", configurable.getClass.getName)
- instance
-
- case Failed(e) ⇒
- throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
- }
-
- case _ ⇒
- instance
- }
-
- case Failed(e) ⇒
- throw new AquariumInternalError("Could not instantiate %s".format(className), e)
- }
-
- }
-
- private[this] lazy val _algorithmCompiler: CostPolicyAlgorithmCompiler = SimpleCostPolicyAlgorithmCompiler
-
- private[this] lazy val _userStateComputations = new UserStateComputations(aquariumSelf)
-
- private[this] lazy val _actorProvider = newInstance[RoleableActorProviderService](props(Keys.actor_provider_class))
-
- /**
- * Initializes a store provider, according to the value configured
- * in the configuration file. The
- */
- private[this] lazy val _storeProvider = newInstance[StoreProvider](props(Keys.store_provider_class))
-
- private[this] lazy val _restService = newInstance[Lifecycle](props(Keys.rest_service_class))
-
- private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
- // If there is a specific `UserStateStore` implementation specified in the
- // properties, then this implementation overrides the user store given by
- // `StoreProvider`.
- props.get(Keys.user_state_store_class) map { className ⇒
- val instance = newInstance[UserStateStore](className)
- logger.info("Overriding %s provisioning. Implementation given by: %s".format(
- shortNameOfClass(classOf[UserStateStore]),
- instance.getClass))
- instance
- }
- }
-
- private[this] lazy val _resourceEventStoreM: Maybe[ResourceEventStore] = {
- // If there is a specific `EventStore` implementation specified in the
- // properties, then this implementation overrides the event store given by
- // `StoreProvider`.
- props.get(Keys.resource_event_store_class) map { className ⇒
- val instance = newInstance[ResourceEventStore](className)
- logger.info("Overriding EventStore provisioning. Implementation given by: %s".format(instance.getClass))
- instance
- }
- }
-
- private[this] lazy val _imEventStoreM: Maybe[IMEventStore] = {
- props.get(Keys.user_event_store_class) map { className ⇒
- val instance = newInstance[IMEventStore](className)
- logger.info("Overriding IMEventStore provisioning. Implementation given by: %s".format(instance.getClass))
- instance
- }
- }
-
- private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
- props.get(Keys.policy_store_class) map {
- className ⇒
- val instance = newInstance[PolicyStore](className)
- logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass))
- instance
- }
- }
-
- private[this] lazy val _eventsStoreFolder: Maybe[File] = {
- props.get(Keys.events_store_folder) map {
- folderName ⇒
- logger.info("{} = {}", Keys.events_store_folder, folderName)
-
- val canonicalFolder = {
- val folder = new File(folderName)
- if(folder.isAbsolute) {
- folder.getCanonicalFile
- } else {
- logger.info("{} is not absolute, making it relative to Aquarium Home", Keys.events_store_folder)
- new File(ResourceLocator.Homes.Folders.AquariumHome, folderName).getCanonicalFile
- }
- }
-
- val canonicalPath = canonicalFolder.getCanonicalPath
-
- if(canonicalFolder.exists() && !canonicalFolder.isDirectory) {
- throw new AquariumInternalError("%s = %s is not a folder".format(Keys.events_store_folder, canonicalFolder))
- }
-
- // Now, events folder must be outside AQUARIUM_HOME, since AQUARIUM_HOME can be wiped out for an upgrade but
- // we still want to keep the events.
- val ahCanonicalPath = ResourceLocator.Homes.Folders.AquariumHome.getCanonicalPath
- if(canonicalPath.startsWith(ahCanonicalPath)) {
- throw new AquariumException(
- "%s = %s is under Aquarium Home = %s".format(
- Keys.events_store_folder,
- canonicalFolder,
- ahCanonicalPath
- ))
- }
-
- canonicalFolder.mkdirs()
-
- canonicalFolder
+ @throws(classOf[AquariumInternalError])
+ def apply[T: Manifest](key: TypedKey[T]): T = {
+ try {
+ env.getEx(key)
+ } catch {
+ case e: Exception ⇒
+ throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
}
}
- private[this] lazy val _events_store_save_rc_events = props.getBoolean(Keys.events_store_save_rc_events).getOr(false)
-
- private[this] lazy val _events_store_save_im_events = props.getBoolean(Keys.events_store_save_im_events).getOr(false)
-
- private[this] lazy val _converters = StdConverters.AllConverters
-
- private[this] lazy val _timerService: TimerService = newInstance[SimpleTimerService]()
-
- private[this] lazy val _akka = newInstance[AkkaService]()
-
- private[this] lazy val _eventBus = newInstance[EventBusService]()
-
- private[this] lazy val _rabbitmqService = newInstance[RabbitMQService]()
-
- private[this] lazy val _storeWatcherService = newInstance[StoreWatcherService]()
-
- private[this] lazy val _allServices = List(
- _timerService,
- _akka,
- _actorProvider,
- _eventBus,
- _restService,
- _rabbitmqService,
- _storeWatcherService
- )
-
- def get(key: String, default: String = ""): String = props.getOr(key, default)
-
- def defaultClassLoader = Thread.currentThread().getContextClassLoader
-
- /**
- * FIXME: This must be ditched.
- *
- * Find a file whose location can be overiden in
- * the configuration file (e.g. policy.yaml)
- *
- * @param name Name of the file to search for
- * @param prop Name of the property that defines the file path
- * @param default Name to return if no file is found
- */
- def findConfigFile(name: String, prop: String, default: String): File = {
- // Check for the configured value first
- val configured = new File(get(prop))
- if (configured.exists)
- return configured
-
- // Look into the configuration context
- ResourceLocator.getResource(name) match {
- case Just(policyResource) ⇒
- val path = policyResource.url.getPath
- new File(path)
- case _ ⇒
- new File(default)
- }
- }
+ private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this(_))
private[this] def startServices(): Unit = {
for(service ← _allServices) {
}
}
- def stopWithDelay(millis: Long) {
- Thread sleep millis
- stop()
- }
-
- private[this] def configure(): Unit = {
+ private[this] def showBasicConfiguration(): Unit = {
logger.info("Aquarium Home = %s".format(
if(Homes.Folders.AquariumHome.isAbsolute)
Homes.Folders.AquariumHome
))
for(folder ← this.eventsStoreFolder) {
- logger.info("{} = {}", Aquarium.Keys.events_store_folder, folder)
+ logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
}
this.eventsStoreFolder.throwMe // on error
}))
}
- def start() = {
+ def start(): Unit = {
this._isStopping.set(false)
- configure()
+ showBasicConfiguration()
addShutdownHooks()
startServices()
}
- def stop() = {
+ def stop(): Unit = {
this._isStopping.set(true)
stopServices()
}
- def algorithmCompiler = _algorithmCompiler
-
- def userStateComputations = _userStateComputations
-
- def converters = _converters
-
- def actorProvider = _actorProvider
-
- def eventBus = _eventBus
-
- def timerService = _timerService
-
- def userStateStore = {
- _userStateStoreM match {
- case Just(us) ⇒ us
- case _ ⇒ storeProvider.userStateStore
- }
+ /**
+ * Stops Aquarium after the given millis. Used during testing.
+ */
+ def stopAfterMillis(millis: Long) {
+ Thread sleep millis
+ stop()
}
- def resourceEventStore = {
- _resourceEventStoreM match {
- case Just(es) ⇒ es
- case _ ⇒ storeProvider.resourceEventStore
- }
- }
+ /**
+ * Reflectively provide a new instance of a class and configure it appropriately.
+ */
+ def newInstance[C <: AnyRef](_class: Class[C], className: String): C = {
+ val originalProps = apply(EnvKeys.originalProps)
- def imEventStore = {
- _imEventStoreM match {
- case Just(es) ⇒ es
- case _ ⇒ storeProvider.imEventStore
- }
- }
+ val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
+ instanceM match {
+ case Just(instance) ⇒
+ eventBus.addSubscriber[C](instance)
+
+ instance match {
+ case configurable: Configurable if (originalProps ne null) ⇒
+ val localProps = configurable.propertyPrefix match {
+ case somePrefix @ Some(prefix) ⇒
+ if(prefix.length == 0) {
+ logger.warn(
+ "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
+ }
+
+ originalProps.subsetForKeyPrefix(prefix)
+
+ case None ⇒
+ originalProps
+ }
+
+ logger.debug("Configuring {} with props", configurable.getClass.getName)
+ MaybeEither(configurable configure localProps) match {
+ case Just(_) ⇒
+ logger.info("Configured {} with props", configurable.getClass.getName)
+ instance
+
+ case Failed(e) ⇒
+ throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
+ }
+
+ case _ ⇒
+ instance
+ }
- def policyStore = {
- _policyStoreM match {
- case Just(es) ⇒ es
- case _ ⇒ storeProvider.policyStore
+ case Failed(e) ⇒
+ throw new AquariumInternalError("Could not instantiate %s".format(className), e)
}
- }
- def storeProvider = _storeProvider
+ }
def currentResourcesMap: DSLResourcesMap = {
// FIXME: Get rid of this singleton stuff
// FIXME: Read from properties?
"default"
}
-
- def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Aquarium = {
- val map = this.props.map
- val newMap = map.updated(Keys.store_provider_class, spc.getName)
- val newProps = new Props(newMap)
- new Aquarium(newProps)
- }
- def eventsStoreFolder = _eventsStoreFolder
+ def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
- def saveResourceEventsToEventsStoreFolder = _events_store_save_rc_events
+ def resourceEventStore = apply(EnvKeys.resourceEventStore)
- def saveIMEventsToEventsStoreFolder = _events_store_save_im_events
+ def imEventStore = apply(EnvKeys.imEventStore)
- def adminCookie: MaybeOption[String] = props.get(Aquarium.Keys.admin_cookie) match {
- case just @ Just(_) ⇒ just
- case _ ⇒ NoVal
- }
+ def userStateStore = apply(EnvKeys.userStateStore)
+
+ def policyStore = apply(EnvKeys.policyStore)
+
+ def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
+
+ def algorithmCompiler = apply(EnvKeys.algorithmCompiler)
+
+ def eventBus = apply(EnvKeys.eventBus)
+
+ def userStateComputations = apply(EnvKeys.userStateComputations)
+
+ def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
+
+ def adminCookie = apply(EnvKeys.adminCookie)
+
+ def converters = apply(EnvKeys.converters)
+
+ def actorProvider = apply(EnvKeys.actorProvider)
+
+ def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
+
+ def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
+
+ def timerService = apply(EnvKeys.timerService)
+
+ def restPort = apply(EnvKeys.restPort)
+
+ def version = apply(EnvKeys.version)
}
object Aquarium {
SysProp.FileEncoding
)
- implicit val DefaultConverters = TheDefaultConverters
-
- final val PolicyConfName = ResourceLocator.ResourceNames.POLICY_YAML
-
- final val RolesAgreementsName = ResourceLocator.ResourceNames.ROLE_AGREEMENTS_MAP
-
- final lazy val AquariumPropertiesResource = ResourceLocator.Resources.AquariumPropertiesResource
-
- final lazy val AquariumProperties = {
- val maybeProps = Props(AquariumPropertiesResource)
- maybeProps match {
- case Just(props) ⇒
- props
-
- case NoVal ⇒
- throw new AquariumInternalError(
- "Could not load %s from %s".format(
- ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
- AquariumPropertiesResource))
-
-
- case Failed(e) ⇒
- throw new AquariumInternalError(
- "Could not load %s from %s".format(
- ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
- AquariumPropertiesResource),
- e)
- }
- }
-
- /**
- * The main [[gr.grnet.aquarium.Aquarium]] instance.
- */
- final lazy val Instance = {
- Maybe(new Aquarium(AquariumProperties)) match {
- case Just(masterConf) ⇒
- masterConf
-
- case NoVal ⇒
- throw new AquariumInternalError(
- "Could not create Aquarium configuration from %s".format(
- AquariumPropertiesResource))
-
- case Failed(e) ⇒
- throw new AquariumInternalError(
- "Could not create Aquarium configuration from %s".format(
- AquariumPropertiesResource),
- e)
- }
- }
-
- /**
- * Defines the names of all the known keys inside the master properties file.
- */
- final object Keys {
+ object HTTP {
+ final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
+ final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
+ }
+
+ final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
+ override def toString = name
+ }
+
+ final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
+ EnvKeys.timerService,
+ EnvKeys.akkaService,
+ EnvKeys.actorProvider,
+ EnvKeys.eventBus,
+ EnvKeys.restService,
+ EnvKeys.rabbitMQService,
+ EnvKeys.storeWatcherService
+ )
+ object EnvKeys {
/**
* The Aquarium version. Will be reported in any due occasion.
*/
- final val version = "version"
+ final val version = StringKey("version")
- /**
- * The fully qualified name of the class that implements the `RoleableActorProviderService`.
- * Will be instantiated reflectively and should have a public default constructor.
- */
- final val actor_provider_class = "actor.provider.class"
-
- /**
- * The class that initializes the REST service
- */
- final val rest_service_class = "rest.service.class"
+ final val originalProps: TypedKey[Props] =
+ new AquariumEnvKey[Props]("originalProps")
/**
* The fully qualified name of the class that implements the `StoreProvider`.
* Will be instantiated reflectively and should have a public default constructor.
*/
- final val store_provider_class = "store.provider.class"
-
- /**
- * The class that implements the User store
- */
- final val user_state_store_class = "user.state.store.class"
+ final val storeProvider: TypedKey[StoreProvider] =
+ new AquariumEnvKey[StoreProvider]("store.provider.class")
/**
- * The class that implements the resource event store
- */
- final val resource_event_store_class = "resource.event.store.class"
-
- /**
- * The class that implements the IM event store
- */
- final val user_event_store_class = "user.event.store.class"
-
- /**
- * The class that implements the wallet entries store
- */
- final val policy_store_class = "policy.store.class"
-
-
- /** The lower mark for the UserActors' LRU.
- *
- * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
+ * If a value is given to this property, then it represents a folder where all events coming to aquarium are
+ * saved.
*
+ * This is for debugging purposes.
*/
- final val user_actors_lru_lower_mark = "user.actors.LRU.lower.mark"
+ final val eventsStoreFolder: TypedKey[Option[File]] =
+ new AquariumEnvKey[Option[File]]("events.store.folder")
/**
- * The upper mark for the UserActors' LRU.
+ * If this is `true` and `events.store.folder` is defined, then all resource events are
+ * also stored in `events.store.folder`.
*
- * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
+ * This is for debugging purposes.
*/
- final val user_actors_lru_upper_mark = "user.actors.LRU.upper.mark"
- /**
- * REST service listening port.
- *
- * Default is 8080.
- */
- final val rest_port = "rest.port"
+ final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
/**
- * Location of the Aquarium accounting policy config file
+ * If this is `true` and `events.store.folder` is defined, then all IM events are
+ * also stored in `events.store.folder`.
+ *
+ * This is for debugging purposes.
*/
- final val aquarium_policy = "aquarium.policy"
+ final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
/**
- * Location of the role-agreement mapping file
- */
- final val aquarium_role_agreement_map = "aquarium.role-agreement.map"
-
- /**
* A time period in milliseconds for which we can tolerate stale parts regarding user state.
*
* The smaller the value, the more accurate the user credits and other state parts are.
* the timestamp of the last known balance amount by this value, then a re-computation for
* the balance is triggered.
*/
- final val user_state_timestamp_threshold = "user.state.timestamp.threshold"
+ final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
/**
- * The time unit is the lowest billable time period.
- * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine
- * seconds, then the user will be billed for ten seconds.
+ * REST service listening port.
*
- * This is an overall constant. We use it as a property in order to prepare ourselves for
- * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud
- * infrastructures.
+ * Default is 8080.
*/
- final val time_unit_in_millis = "time.unit.in.seconds"
+ final val restPort = IntKey("rest.port")
/**
- * If a value is given to this property, then it represents a folder where all events coming to aquarium are
- * saved.
+ * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
+ * an authorised client.
*/
- final val events_store_folder = "events.store.folder"
+ final val adminCookie: TypedKey[Option[String]] =
+ new AquariumEnvKey[Option[String]]("admin.cookie")
- /**
- * If this is `true` and `events.store.folder` is defined, then all resource events are
- * also stored in `events.store.folder`.
- *
- * This is for debugging purposes.
- */
- final val events_store_save_rc_events = "events.store.save.rc.events"
+ final val resourceEventStore: TypedKey[ResourceEventStore] =
+ new AquariumEnvKey[ResourceEventStore]("resource.event.store.class")
- /**
- * If this is `true` and `events.store.folder` is defined, then all IM events are
- * also stored in `events.store.folder`.
- *
- * This is for debugging purposes.
- */
- final val events_store_save_im_events = "events.store.save.im.events"
+ final val imEventStore: TypedKey[IMEventStore] =
+ new AquariumEnvKey[IMEventStore]("im.event.store.class")
+
+ final val userStateStore: TypedKey[UserStateStore] =
+ new AquariumEnvKey[UserStateStore]("user.state.store.class")
+
+ final val policyStore: TypedKey[PolicyStore] =
+ new AquariumEnvKey[PolicyStore]("policy.store.class")
/**
- * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.event.model.im.IMEventModel]] is
- * saved to the [[gr.grnet.aquarium.store.IMEventStore]].
+ * The class that initializes the REST service
*/
- final val save_unparsed_event_im = "save.unparsed.event.im"
+ final val restService: TypedKey[Lifecycle] =
+ new AquariumEnvKey[Lifecycle]("rest.service.class")
/**
- * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
- * an authorised client.
+ * The fully qualified name of the class that implements the `RoleableActorProviderService`.
+ * Will be instantiated reflectively and should have a public default constructor.
*/
- final val admin_cookie = "admin.cookie"
- }
+ final val actorProvider: TypedKey[RoleableActorProviderService] =
+ new AquariumEnvKey[RoleableActorProviderService]("actor.provider.class")
+
+ final val akkaService: TypedKey[AkkaService] =
+ new AquariumEnvKey[AkkaService]("akka.service")
+
+ final val eventBus: TypedKey[EventBusService] =
+ new AquariumEnvKey[EventBusService]("event.bus.service")
+
+ final val timerService: TypedKey[TimerService] =
+ new AquariumEnvKey[TimerService]("timer.service")
+
+ final val rabbitMQService: TypedKey[RabbitMQService] =
+ new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
+
+ final val storeWatcherService: TypedKey[StoreWatcherService] =
+ new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
+
+ final val converters: TypedKey[Converters] =
+ new AquariumEnvKey[Converters]("converters")
+
+ final val algorithmCompiler: TypedKey[CostPolicyAlgorithmCompiler] =
+ new AquariumEnvKey[CostPolicyAlgorithmCompiler]("algorithm.compiler")
+
+ final val userStateComputations: TypedKey[UserStateComputations] =
+ new AquariumEnvKey[UserStateComputations]("user.state.computations")
+
+ final val defaultClassLoader: TypedKey[ClassLoader] =
+ new AquariumEnvKey[ClassLoader]("default.class.loader")
- object HTTP {
- final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
- final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
}
}
* or implied, of GRNET S.A.
*/
-package gr.grnet.aquarium.logic.test
-
-import org.junit.Assert._
-import org.junit.{Test}
-import io.Source
-import gr.grnet.aquarium.util.TestMethods
-import gr.grnet.aquarium.logic.accounting.{Policy, RoleAgreements}
+package gr.grnet.aquarium
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
/**
- * Tests for the [[gr.grnet.aquarium.logic.accounting.RoleAgreements]] class
*
- * @author Georgios Gousios <gousiosg@gmail.com>
+ * @author Christos KK Loverdos <loverdos@gmail.com>
*/
-class RoleAgreementsTest extends TestMethods {
-
- @Test
- def testParseMappings {
-
- var a = """
-
- # Some useless comment
-student=foobar # This should be ignored (no default policy)
-prof=default
- name=default
-%asd=default # This should be ignored (non accepted char)
-*=default
- """
-
- var src = Source.fromBytes(a.getBytes())
- var output = RoleAgreements.parseMappings(src)
-
- assertEquals(3, output.size)
- assertEquals("default", output.getOrElse("prof",null).name)
-
- // No default value
- a = """
- prof=default
- """
- src = Source.fromBytes(a.getBytes())
- assertThrows[RuntimeException](RoleAgreements.parseMappings(src))
- }
-
- @Test
- def testLoadMappings {
- // Uses the roles-agreements.map file in test/resources
- RoleAgreements.loadMappings
-
- assertEquals("default", RoleAgreements.agreementForRole("student").name)
- // Check that default policies are applied
- assertEquals("default", RoleAgreements.agreementForRole("foobar").name)
- }
+trait AquariumAware {
+ def awareOfAquariumEx(event: AquariumCreatedEvent): Unit
}
package gr.grnet.aquarium
-import store.memory.MemStore
-import store.mongodb.MongoDBStoreProvider
-import util.Loggable
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+import com.google.common.eventbus.Subscribe
/**
- * Returns the appropriate Configurator implementation depending on value
- * of the test.store runtime parameter.
*
- * @author Georgios Gousios <gousiosg@gmail.com>
+ * @author Christos KK Loverdos <loverdos@gmail.com>
*/
-trait StoreConfigurator extends Loggable {
- def configurator: Aquarium =
- LogicTestsAssumptions.propertyValue(PropertyNames.TestStore) match {
- case "mem" => Aquarium.Instance.withStoreProviderClass(classOf[MemStore])
- case "mongo" => Aquarium.Instance.withStoreProviderClass(classOf[MongoDBStoreProvider])
- case _ =>
- logger.warn("Unknown store type, defaulting to \"mem\"")
- Aquarium.Instance.withStoreProviderClass(classOf[MemStore])
+trait AquariumAwareSkeleton extends AquariumAware {
+ @volatile protected var _aquarium: Aquarium = null
+
+ final def aquarium = _aquarium
+
+ @Subscribe
+ def awareOfAquariumEx(event: AquariumCreatedEvent) = {
+ this._aquarium = event.aquarium
}
}
--- /dev/null
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium
+
+import com.ckkloverdos.key.{BooleanKey, TypedKey}
+import com.ckkloverdos.env.Env
+import com.ckkloverdos.props.Props
+import com.ckkloverdos.maybe.{MaybeOption, Failed, MaybeEither, Just, NoVal}
+import gr.grnet.aquarium.util.Loggable
+import java.io.File
+import gr.grnet.aquarium.store.StoreProvider
+import gr.grnet.aquarium.logic.accounting.algorithm.SimpleCostPolicyAlgorithmCompiler
+import gr.grnet.aquarium.computation.UserStateComputations
+import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, AkkaService, SimpleTimerService, EventBusService}
+import gr.grnet.aquarium.converter.StdConverters
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+
+/**
+ * Create a tailored Aquarium.
+ *
+ * Thread-unsafe.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+final class AquariumBuilder(val originalProps: Props) extends Loggable {
+ if(originalProps eq null) {
+ throw new AquariumInternalError("props is null")
+ }
+
+ import Aquarium.EnvKeys
+
+ private[this] var _env = Env()
+ // This is special
+ private[this] val eventBus = new EventBusService
+
+ @volatile
+ private[this] var _aquarium: Aquarium = _
+
+ @throws(classOf[AquariumInternalError])
+ private def propsGetEx(key: String): String = {
+ try {
+ originalProps.getEx(key)
+ } catch {
+ case e: Exception ⇒
+ throw new AquariumInternalError("Could not locate %s in Aquarium properties".format(key))
+ }
+ }
+
+ @throws(classOf[AquariumInternalError])
+ private def envGetEx[T: Manifest](key: TypedKey[T]): T = {
+ try {
+ _env.getEx(key)
+ } catch {
+ case e: Exception ⇒
+ throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
+ }
+ }
+
+ def update[T: Manifest](keyvalue: (TypedKey[T], T)): this.type = {
+ assert(keyvalue ne null, "keyvalue ne null")
+
+ _env += keyvalue
+ this
+ }
+
+ def update[T : Manifest](key: TypedKey[T], value: T): this.type = {
+ assert(key ne null, "key ne null")
+
+ this update (key -> value)
+ }
+
+ /**
+ * Reflectively provide a new instance of a class and configure it appropriately.
+ */
+ private[this] def newInstance[C <: AnyRef](manifest: Manifest[C], className: String): C = {
+ val defaultClassLoader = Thread.currentThread().getContextClassLoader
+ val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
+ instanceM match {
+ case Just(instance) ⇒
+ eventBus.addSubscriber(instance)
+
+ instance match {
+ case configurable: Configurable if (originalProps ne null) ⇒
+ val localProps = configurable.propertyPrefix match {
+ case somePrefix @ Some(prefix) ⇒
+ if(prefix.length == 0) {
+ logger.warn(
+ "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
+ }
+
+ originalProps.subsetForKeyPrefix(prefix)
+
+ case None ⇒
+ originalProps
+ }
+
+ logger.debug("Configuring {} with props", configurable.getClass.getName)
+ MaybeEither(configurable configure localProps) match {
+ case Just(_) ⇒
+ logger.info("Configured {} with props", configurable.getClass.getName)
+ instance
+
+ case Failed(e) ⇒
+ throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
+ }
+
+ case _ ⇒
+ instance
+ }
+
+ case Failed(e) ⇒
+ throw new AquariumInternalError("Could not instantiate %s".format(className), e)
+ }
+ }
+
+ private[this] def checkStoreProviderOverride: Unit = {
+ val envKey = EnvKeys.storeProvider
+ if(_env.contains(envKey)) {
+ return
+ }
+
+ if(originalProps eq null) {
+ throw new AquariumInternalError("Cannot locate store provider, since no properties have been defined")
+ }
+
+ val propName = envKey.name
+ originalProps.get(propName) match {
+ case Just(propValue) ⇒
+ update(envKey, newInstance(envKey.keyType, propValue))
+
+ case NoVal ⇒
+ throw new AquariumInternalError("No store provider is given in properties")
+
+ case Failed(e) ⇒
+ throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName))
+ }
+ }
+
+ private[this] def checkStoreOverrides: Unit = {
+ if(originalProps eq null) {
+ return
+ }
+
+ def checkOverride[S <: AnyRef : Manifest](envKey: TypedKey[S], f: StoreProvider ⇒ S): Unit = {
+ if(!_env.contains(envKey)) {
+ val propName = envKey.name
+
+ originalProps.get(propName) match {
+ case Just(propValue) ⇒
+ // Create the store reflectively
+ update(envKey, newInstance(envKey.keyType, propValue))
+
+ case NoVal ⇒
+ // Get the store from the store provider
+ val storeProvider = this.envGetEx(EnvKeys.storeProvider)
+ val propValue = f(storeProvider)
+ update(envKey, propValue)
+
+ case Failed(e) ⇒
+ throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName))
+ }
+ }
+ }
+
+ // If a store has not been specifically overridden, we load it from the properties
+ checkOverride(EnvKeys.resourceEventStore, _.resourceEventStore)
+ checkOverride(EnvKeys.imEventStore, _.imEventStore)
+ checkOverride(EnvKeys.userStateStore, _.userStateStore)
+ checkOverride(EnvKeys.policyStore, _.policyStore)
+ }
+
+ private[this] def checkEventsStoreFolderOverride: Unit = {
+ val propName = EnvKeys.eventsStoreFolder.name
+
+ _env.get(EnvKeys.eventsStoreFolder) match {
+ case Just(storeFolderOption) ⇒
+ // Some value has been set, even a None, so do nothing more
+ logger.info("{} = {}", propName, storeFolderOption)
+
+ case Failed(e) ⇒
+ throw new AquariumInternalError(e, "While obtaining value for env key %s".format(propName))
+
+ case NoVal ⇒
+ if(originalProps eq null) {
+ update(EnvKeys.eventsStoreFolder, None)
+ return
+ }
+
+ // load from props
+ for(folderName ← originalProps.get(propName)) {
+ logger.info("{} = {}", propName, folderName)
+
+ update(EnvKeys.eventsStoreFolder, Some(new File(folderName)))
+ }
+
+ }
+ }
+
+ private[this] def checkEventsStoreFolderExistence: Unit = {
+ val propName = EnvKeys.eventsStoreFolder.name
+ for(folder ← this.envGetEx(EnvKeys.eventsStoreFolder)) {
+ val canonicalFolder = {
+ if(folder.isAbsolute) {
+ folder.getCanonicalFile
+ } else {
+ logger.info("{} is not absolute, making it relative to Aquarium Home", propName)
+ new File(ResourceLocator.Homes.Folders.AquariumHome, folder.getPath).getCanonicalFile
+ }
+ }
+
+ val canonicalPath = canonicalFolder.getCanonicalPath
+
+ if(canonicalFolder.exists() && !canonicalFolder.isDirectory) {
+ throw new AquariumInternalError("%s = %s is not a folder".format(propName, canonicalFolder))
+ }
+
+ // Now, events folder must be outside AQUARIUM_HOME, since AQUARIUM_HOME can be wiped out for an upgrade but
+ // we still want to keep the events.
+ val ahCanonicalPath = ResourceLocator.Homes.Folders.AquariumHome.getCanonicalPath
+ if(canonicalPath.startsWith(ahCanonicalPath)) {
+ throw new AquariumInternalError(
+ "%s = %s is under Aquarium Home = %s".format(
+ propName,
+ canonicalFolder,
+ ahCanonicalPath
+ ))
+ }
+
+ canonicalFolder.mkdirs()
+
+ update(EnvKeys.eventsStoreFolder, Some(canonicalFolder))
+ }
+ }
+
+ private[this] def checkEventsStoreFolderVariablesOverrides: Unit = {
+ def checkVar(envKey: BooleanKey): Unit = {
+ if(!_env.contains(envKey)) {
+ val propName = envKey.name
+ originalProps.getBoolean(propName) match {
+ case Just(propValue) ⇒
+ update(envKey, propValue)
+
+ case NoVal ⇒
+ update(envKey, false)
+
+ case Failed(e) ⇒
+ throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName))
+ }
+ }
+ }
+
+ checkVar(EnvKeys.eventsStoreSaveRCEvents)
+ checkVar(EnvKeys.eventsStoreSaveIMEvents)
+ }
+
+ private[this] def checkRestServiceOverride: Unit = {
+ checkNoPropsOverride(EnvKeys.restService) { envKey ⇒
+ val envKey = EnvKeys.restService
+ val propName = envKey.name
+ val propValue = propsGetEx(propName)
+
+ newInstance(envKey.keyType, propValue)
+ }
+ }
+
+ private[this] def checkNoPropsOverride[T: Manifest](envKey: TypedKey[T])(f: TypedKey[T] ⇒ T): Unit = {
+ if(_env.contains(envKey)) {
+ return
+ }
+
+ update(envKey, f(envKey))
+ }
+
+ private[this] def checkPropsOverride[T: Manifest](envKey: TypedKey[T])(f: (TypedKey[T], String) ⇒ T): Unit = {
+ if(_env.contains(envKey)) {
+ return
+ }
+
+ val propName = envKey.name
+ originalProps.get(propName) match {
+ case Just(propValue) ⇒
+ update(envKey, f(envKey, propValue))
+
+ case NoVal ⇒
+ throw new AquariumInternalError("No value for key %s in properties".format(propName))
+
+ case Failed(e) ⇒
+ throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName))
+ }
+ }
+
+ private[this] def checkOptionalPropsOverride[T: Manifest]
+ (envKey: TypedKey[Option[T]])
+ (f: (TypedKey[Option[T]], String) ⇒ Option[T]): Unit = {
+
+ if(_env.contains(envKey)) {
+ return
+ }
+
+ val propName = envKey.name
+ originalProps.get(propName) match {
+ case Just(propValue) ⇒
+ update(envKey, f(envKey, propValue))
+
+ case NoVal ⇒
+ update(envKey, None)
+
+ case Failed(e) ⇒
+ throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName))
+ }
+ }
+
+ def build(): Aquarium = {
+ if(this._aquarium ne null) {
+ return this._aquarium
+ }
+
+ checkPropsOverride(EnvKeys.version) { (envKey, propValue) ⇒ propValue }
+
+ checkNoPropsOverride(EnvKeys.eventBus) { _ ⇒ eventBus }
+
+ checkNoPropsOverride(EnvKeys.originalProps) { _ ⇒ originalProps }
+
+ checkNoPropsOverride(EnvKeys.defaultClassLoader) { _ ⇒ Thread.currentThread().getContextClassLoader }
+
+ checkNoPropsOverride(EnvKeys.converters) { _ ⇒ StdConverters.AllConverters }
+
+ checkStoreProviderOverride
+ checkStoreOverrides
+
+ checkEventsStoreFolderOverride
+ checkEventsStoreFolderExistence
+ checkEventsStoreFolderVariablesOverrides
+
+ checkRestServiceOverride
+
+ checkNoPropsOverride(EnvKeys.timerService) { envKey ⇒
+ newInstance(envKey.keyType, classOf[SimpleTimerService].getName)
+ }
+
+ checkNoPropsOverride(EnvKeys.algorithmCompiler) { _ ⇒ SimpleCostPolicyAlgorithmCompiler }
+
+ checkNoPropsOverride(EnvKeys.userStateComputations) { envKey ⇒
+ newInstance(envKey.keyType, classOf[UserStateComputations].getName)
+ }
+
+ checkNoPropsOverride(EnvKeys.akkaService) { envKey ⇒
+ newInstance(envKey.keyType, classOf[AkkaService].getName)
+ }
+
+ checkNoPropsOverride(EnvKeys.rabbitMQService) { envKey ⇒
+ newInstance(envKey.keyType, classOf[RabbitMQService].getName)
+ }
+
+ checkNoPropsOverride(EnvKeys.storeWatcherService) { envKey ⇒
+ newInstance(envKey.keyType, classOf[StoreWatcherService].getName)
+ }
+
+ checkPropsOverride(EnvKeys.actorProvider) { (envKey, propValue) ⇒
+ newInstance(envKey.keyType, propValue)
+ }
+
+ checkPropsOverride(EnvKeys.userStateTimestampThreshold) { (envKey, propValue) ⇒
+ propValue.toLong
+ }
+
+ checkPropsOverride(EnvKeys.restPort) { (envKey, propValue) ⇒
+ propValue.toInt
+ }
+
+ checkOptionalPropsOverride(EnvKeys.adminCookie) { (envKey, propValue) ⇒
+ Some(propValue)
+ }
+
+ this._aquarium = new Aquarium(_env)
+
+ this._aquarium.eventBus.syncPost(AquariumCreatedEvent(this._aquarium))
+
+ this._aquarium
+ }
+}
import ch.qos.logback.classic.LoggerContext
import ch.qos.logback.classic.joran.JoranConfigurator
import com.ckkloverdos.maybe.Just
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
/**
* Main method for Aquarium
}
}
- def doStart(): Unit = {
- import ResourceLocator.SysEnvs
-
- // We have AKKA builtin, so no need to mess with pre-existing installation.
- if(SysEnvs.AKKA_HOME.value.isJust) {
- val error = new AquariumInternalError("%s is set. Please unset and restart Aquarium".format(SysEnvs.Names.AKKA_HOME))
- logger.error("%s is set".format(SysEnvs.Names.AKKA_HOME), error)
- throw error
- }
-
- Aquarium.Instance.start()
- }
-
def main(args: Array[String]) = {
configureLogging()
logStarting("Aquarium")
val ms0 = TimeHelpers.nowMillis()
try {
- doStart()
+ val aquarium = new AquariumBuilder(ResourceLocator.AquariumProperties).build()
+ aquarium.start()
+
val ms1 = TimeHelpers.nowMillis()
- logStarted(ms0, ms1, "Aquarium")
+ logStarted(ms0, ms1, "Aquarium [%s]", aquarium.version)
logSeparator()
} catch {
case e: Throwable ⇒
import gr.grnet.aquarium.util.justForSure
import gr.grnet.aquarium.util.isRunningTests
import com.ckkloverdos.resource.{FileStreamResource, StreamResource, CompositeStreamResourceContext, ClassLoaderStreamResourceContext, FileStreamResourceContext}
+import com.ckkloverdos.props.Props
+import com.ckkloverdos.convert.Converters._
+import com.ckkloverdos.maybe.Just
+import com.ckkloverdos.maybe.Failed
+import com.ckkloverdos.convert.Converters
/**
* Locates resources.
}
}
+ final lazy val AquariumProperties = {
+ implicit val DefaultConverters = Converters.DefaultConverters
+ val maybeProps = Props(Resources.AquariumPropertiesResource)
+ maybeProps match {
+ case Just(props) ⇒
+ props
+
+ case NoVal ⇒
+ throw new AquariumInternalError(
+ "Could not load %s from %s".format(
+ ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
+ Resources.AquariumPropertiesResource))
+
+
+ case Failed(e) ⇒
+ throw new AquariumInternalError(
+ "Could not load %s from %s".format(
+ ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
+ Resources.AquariumPropertiesResource),
+ e)
+ }
+ }
+
+
def getResource(what: String): Maybe[StreamResource] = {
ResourceContexts.MasterResourceContext.getResource(what)
}
*
* @author Christos KK Loverdos <loverdos@gmail.com>.
*/
-trait RoleableActor extends Actor with Loggable {
+trait RoleableActor extends Actor with Loggable with AquariumAwareSkeleton {
def role: ActorRole
override def toString = "%s@%s(%s)".format(shortClassNameOf(this), System.identityHashCode(this), role.role)
import RESTPaths._
import gr.grnet.aquarium.util.date.TimeHelpers
import org.joda.time.format.ISODateTimeFormat
-import gr.grnet.aquarium.actor.message.admin.PingAllRequest
-import gr.grnet.aquarium.actor.message.{RouterResponseMessage, GetUserStateRequest, RouterRequestMessage, ActorMessage, GetUserBalanceRequest}
+import gr.grnet.aquarium.actor.message.{RouterResponseMessage, GetUserStateRequest, RouterRequestMessage, GetUserBalanceRequest}
import gr.grnet.aquarium.{ResourceLocator, Aquarium}
import com.ckkloverdos.resource.StreamResource
-import com.ckkloverdos.maybe.{Failed, NoVal, Just}
+import com.ckkloverdos.maybe.Failed
import java.net.InetAddress
-import gr.grnet.aquarium.event.model.{ExternalEventModel, EventModel}
+import gr.grnet.aquarium.event.model.ExternalEventModel
/**
* Spray-based REST service. This is the outer-world's interface to Aquarium functionality.
final val TEXT_PLAIN = "text/plain"
final val APPLICATION_JSON = "application/json"
-
- private[this] def aquarium = Aquarium.Instance
-
private def stringResponse(status: Int, stringBody: String, contentType: String): HttpResponse = {
HttpResponse(
status,
)( f: RequestResponder ⇒ Unit): Unit = {
aquarium.adminCookie match {
- case Just(adminCookie) ⇒
+ case Some(adminCookie) ⇒
headers.find(_.name.toLowerCase == Aquarium.HTTP.RESTAdminHeaderNameLowerCase) match {
case Some(cookieHeader) if(cookieHeader.value == adminCookie) ⇒
try f(responder)
responder.complete(stringResponse(401, "Unauthorized!", TEXT_PLAIN))
}
- case NoVal ⇒
+ case None ⇒
responder.complete(stringResponse(403, "Forbidden!", TEXT_PLAIN))
}
}
private[this]
def callRouter(message: RouterRequestMessage, responder: RequestResponder): Unit = {
- val aquarium = Aquarium.Instance
val actorProvider = aquarium.actorProvider
val router = actorProvider.actorForRole(RouterRole)
val futureResponse = router ask message
import gr.grnet.aquarium.event.model.im.IMEventModel
import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf, shortNameOfClass, shortNameOfType}
-import gr.grnet.aquarium.computation.reason.{RealtimeBillingCalculation, InitialUserActorSetup, UserStateChangeReason, IMEventArrival, InitialUserStateSetup}
-import gr.grnet.aquarium.{AquariumInternalError, Aquarium}
+import gr.grnet.aquarium.computation.reason.{RealtimeBillingCalculation, InitialUserActorSetup, UserStateChangeReason, IMEventArrival}
+import gr.grnet.aquarium.AquariumInternalError
import gr.grnet.aquarium.computation.state.parts.IMStateSnapshot
import gr.grnet.aquarium.computation.BillingMonthInfo
import gr.grnet.aquarium.computation.state.{UserStateBootstrap, UserState}
-import gr.grnet.aquarium.event.model.resource.ResourceEventModel
/**
*
def role = UserActorRole
- private[this] def aquarium: Aquarium = Aquarium.Instance
-
private[this] def userStateComputations = aquarium.userStateComputations
private[this] def stdUserStateStoreFunc = (userState: UserState) ⇒ {
}
private[this] def _timestampTheshold = {
- aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(1000L * 60 * 5 /* 5 minutes */)
+ aquarium.userStateTimestampThreshold
}
private[this] def haveUserState = {
import gr.grnet.aquarium.computation.state.parts._
import gr.grnet.aquarium.event.model.NewWalletEntry
import gr.grnet.aquarium.event.model.resource.ResourceEventModel
-import gr.grnet.aquarium.{Aquarium, AquariumInternalError}
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Aquarium, AquariumAware, AquariumInternalError}
import gr.grnet.aquarium.computation.reason.{MonthlyBillingCalculation, InitialUserStateSetup, UserStateChangeReason}
import gr.grnet.aquarium.computation.state.{UserStateWorker, UserStateBootstrap, UserState}
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+import com.google.common.eventbus.Subscribe
/**
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
-
- lazy val aquarium = _aquarium
-
- lazy val storeProvider = aquarium.storeProvider
- lazy val timeslotComputations = new TimeslotComputations {}
+final class UserStateComputations extends AquariumAwareSkeleton with Loggable {
+ lazy val timeslotComputations = new TimeslotComputations {} // FIXME
lazy val algorithmCompiler = aquarium.algorithmCompiler
- lazy val policyStore = storeProvider.policyStore
- lazy val userStateStoreForRead = storeProvider.userStateStore
- lazy val resourceEventStore = storeProvider.resourceEventStore
+ lazy val policyStore = aquarium.policyStore
+ lazy val userStateStoreForRead = aquarium.userStateStore
+ lazy val resourceEventStore = aquarium.resourceEventStore
def findUserStateAtEndOfBillingMonth(
userStateBootstrap: UserStateBootstrap,
*/
validTo: Long = Long.MaxValue
) {
-
- try {
- require(
- validFrom <= validTo,
- "validFrom(%s) <= validTo(%s)".format(new MutableDateCalc(validFrom), new MutableDateCalc(validTo)))
- }
- catch {
- case e: IllegalArgumentException ⇒
- // TODO Remove this
- Aquarium.Instance.debug(this, "!! validFrom = %s, validTo = %s, dx=%s", validFrom, validTo, validTo-validFrom)
- throw e
- }
+ require(
+ validFrom <= validTo,
+ "validFrom(%s) <= validTo(%s)".format(new MutableDateCalc(validFrom), new MutableDateCalc(validTo)))
require(name ne null, "Name is not null")
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
-
- /**
- * Specifies what we do with the message payload.
- */
- handler: PayloadHandler,
-
- /**
- * Specifies how we execute the handler
- */
- executor: PayloadHandlerExecutor,
-
- /**
- * After the payload is processed, we call this function with ourselves and the result.
- */
- notifier: (RabbitMQConsumer, Maybe[HandlerResult]) ⇒ Unit
+class RabbitMQConsumer(
+ val aquarium: Aquarium,
+ val conf: RabbitMQConsumerConf,
+
+ /**
+ * Specifies what we do with the message payload.
+ */
+ handler: PayloadHandler,
+
+ /**
+ * Specifies how we execute the handler
+ */
+ executor: PayloadHandlerExecutor,
+
+ /**
+ * After the payload is processed, we call this function with ourselves and the result.
+ */
+ notifier: (RabbitMQConsumer, Maybe[HandlerResult]) ⇒ Unit
) extends Loggable with Lifecycle { consumerSelf ⇒
private[this] var _factory: ConnectionFactory = _
case object LifecycleStartReason extends StartReason
case object PingStartReason extends StartReason
- private[this] def timerService = Aquarium.Instance.timerService
+ private[this] def timerService = aquarium.timerService
private[this] lazy val servers = {
conf.connectionConf(RabbitMQConKeys.servers)
safeStop()
}
- private[this] def aquarium = Aquarium.Instance
-
private[this] def postBusError(event: BusEvent): Unit = {
aquarium.eventBus ! event
}
package gr.grnet.aquarium.logic.accounting
import dsl.{Timeslot, DSLPolicy, DSL}
-import gr.grnet.aquarium.Aquarium._
import java.io.{InputStream, FileInputStream, File}
import java.util.Date
import gr.grnet.aquarium.util.Loggable
import java.util.concurrent.atomic.AtomicReference
-import gr.grnet.aquarium.Aquarium.Keys
-import com.ckkloverdos.maybe.{Failed, NoVal, Just}
import collection.immutable.{TreeMap, SortedMap}
-import gr.grnet.aquarium.{AquariumException, Aquarium}
+import gr.grnet.aquarium.{ResourceLocator, AquariumAwareSkeleton, AquariumException, Aquarium}
import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
+import com.ckkloverdos.maybe.{Failed, Just}
/**
* Searches for and loads the applicable accounting policy
*
* @author Georgios Gousios <gousiosg@gmail.com>
*/
-object Policy extends DSL with Loggable {
+object Policy extends DSL with Loggable with AquariumAwareSkeleton {
/* Pointer to the latest policy */
private[logic] lazy val policies = {
*/
private[logic] def reloadPolicies: SortedMap[Timeslot, DSLPolicy] =
if (config == null)
- reloadPolicies(Instance)
+ reloadPolicies(aquarium)
else
reloadPolicies(config)
//2. Check whether policy file has been updated
val latestPolicyChange = if (pol.isEmpty) 0 else pol.last.validFrom
- val policyf = Instance.findConfigFile(PolicyConfName, Keys.aquarium_policy, PolicyConfName)
+
+ val policyf = ResourceLocator.Resources.PolicyYAMLResource
var updated = false
if (policyf.exists) {
- if (policyf.lastModified > latestPolicyChange) {
- logger.info("Policy file updated since last check, reloading")
updated = true
- } else {
- logger.info("Policy file not changed since last check")
- }
- } else {
+ } else {
logger.warn("User specified policy file %s does not exist, " +
- "using stored policy information".format(policyf.getAbsolutePath))
+ "using stored policy information".format(policyf.url))
}
if (updated) {
val ts = TimeHelpers.nowMillis()
- val parsedNew = loadPolicyFromFile(policyf)
+ val parsedNewM = policyf.mapInputStream(parse).toMaybeEither
+ val parsedNew = parsedNewM match {
+ case Just(parsedNew) ⇒ parsedNew
+ case Failed(e) ⇒ throw e
+ }
val newPolicy = parsedNew.toPolicyEntry
config.policyStore.findPolicyEntry(newPolicy.id) match {
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.logic.accounting
-
-import dsl.DSLAgreement
-import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.Aquarium
-import gr.grnet.aquarium.Aquarium.{Instance, Keys}
-import io.Source
-import java.io.{InputStream, File}
-import java.util.regex.Pattern
-
-/**
- * Encapsulates mappings from user roles to Aquarium policies. The mappings
- * are used during new user registration to automatically set a policy to
- * a user according to its role.
- *
- * The configuration is read from a configuration file pointed to by the
- * main Aquarium configuration file. The
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-object RoleAgreements extends Loggable {
-
- private var mappings: Map[String, DSLAgreement] = loadMappings
-
- /**
- * Returns the agreement that matches the provided role. The search for a
- * matching agreement is done with the current version of the Aquarium
- * policy.
- */
- def agreementForRole(role : String) = mappings.get(role.toLowerCase) match {
- case Some(x) => x
- case None => mappings.get("*").getOrElse(
- throw new RuntimeException("Cannot find agreement for default role *"))
- }
-
- /**
- * Trigger reloading of the mappings file.
- */
- def reloadMappings = mappings = loadMappings
-
- /**
- * Load and parse the mappings file
- */
- private[logic] def loadMappings = synchronized {
- val config = Aquarium.Instance.get(Keys.aquarium_role_agreement_map)
- val configFile = Aquarium.Instance.findConfigFile(
- Aquarium.RolesAgreementsName, Keys.aquarium_role_agreement_map,
- Aquarium.RolesAgreementsName)
-
- def loadFromClasspath: Source = {
- getClass.getClassLoader.getResourceAsStream(Aquarium.RolesAgreementsName) match {
- case x: InputStream =>
- logger.warn("Using default role to agreement mappings, this is " +
- "problably not what you want")
- Source.fromInputStream(x)
- case null =>
- logger.error("No valid role to agreement mappings configuration found, " +
- "Aquarium will fail")
- null
- }
- }
-
- val source = if (configFile.exists && configFile.isFile) {
- if (configFile.isFile)
- Source.fromFile(configFile)
- else {
- logger.warn(("Configured file %s is a directory. " +
- "Trying the default one.").format(config))
- loadFromClasspath
- }
- } else {
- logger.warn("Configured file %s for role-agreement mappings cannot " +
- "be found. Trying the default one.".format(config))
- loadFromClasspath
- }
-
- parseMappings(source)
- }
-
- def parseMappings(src: Source) = {
- val p = Pattern.compile("^\\s*([\\*a-zA-Z0-9-_]+)\\s*=\\s*([a-zA-Z0-9-_]+).*$")
-
- val mappings = src.getLines.foldLeft(Map[String, DSLAgreement]()) {
- (acc, l) =>
- l match {
- case x if (x.matches("^\\s*$")) => acc
- case x if (x.matches("^\\s*\\#")) => acc
- case x if (p.matcher(x).find()) =>
- // Ugly code warning
- val m = p.matcher(x)
- m.find()
- val role = m.group(1)
- val agrName = m.group(2)
- Policy.policy.findAgreement(agrName) match {
- case Some(x) => acc ++ Map(role -> x)
- case None =>
- logger.warn("No agreement with name %s".format(agrName))
- acc
- }
- case _ => acc
- }
- }
- if (!mappings.keysIterator.contains("*"))
- throw new RuntimeException("Cannot find agreement for default role *")
- mappings
- }
-}
import akka.actor.Actor
import gr.grnet.aquarium.util.{Loggable, Lifecycle}
-import gr.grnet.aquarium.util.date.TimeHelpers
+import gr.grnet.aquarium.ResourceLocator.SysEnvs
+import gr.grnet.aquarium.AquariumInternalError
/**
* A wrapper around Akka, so that it is uniformly treated as an Aquarium service.
final class AkkaService extends Lifecycle with Loggable {
def start() = {
+ // We have AKKA builtin, so no need to mess with pre-existing installation.
+ if(SysEnvs.AKKA_HOME.value.isJust) {
+ val error = new AquariumInternalError("%s is set. Please unset and restart Aquarium".format(SysEnvs.Names.AKKA_HOME))
+ logger.error("%s is set".format(SysEnvs.Names.AKKA_HOME), error)
+ throw error
+ }
}
def stop()= {
import gr.grnet.aquarium.Configurable
import com.ckkloverdos.props.Props
import gr.grnet.aquarium.service.event.BusEvent
-import com.google.common.eventbus.{AsyncEventBus, DeadEvent, Subscribe}
+import com.google.common.eventbus.{EventBus, AsyncEventBus, DeadEvent, Subscribe}
import gr.grnet.aquarium.util.{DaemonThreadFactory, Lifecycle, Loggable}
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.Collections
*/
class EventBusService extends Loggable with Lifecycle with Configurable {
+ private[this] val className = classOf[EventBusService].getName
private[this] val asyncBus = new AsyncEventBus(
- classOf[EventBusService].getName,
+ "%s/async".format(className),
Executors.newFixedThreadPool(1, new DaemonThreadFactory)
)
+ private[this] val syncBus = new EventBus("%s/sync")
+
private[this] val subscribers = Collections.newSetFromMap[AnyRef](new ConcurrentHashMap())
def propertyPrefix = None
def stop() = synchronized {
val iterator = subscribers.iterator()
while(iterator.hasNext) {
- asyncBus.unregister(iterator.next())
+ val subscriber = iterator.next()
+ asyncBus.unregister(subscriber)
+ syncBus.unregister(subscriber)
}
subscribers.clear()
}
- @inline
- def post[A <: BusEvent](event: A): Unit = {
- this ! event
+ /**
+ * Posts an event synchronously.
+ */
+ def syncPost[A <: BusEvent](event: A): Unit = {
+ syncBus.post(event)
}
+ /**
+ * Post an event asynchronously.
+ */
def ![A <: BusEvent](event: A): Unit = {
asyncBus.post(event)
}
def removeSubscriber[A <: AnyRef](subscriber: A): Unit = synchronized {
subscribers.remove(subscriber)
asyncBus.unregister(subscriber)
+ syncBus.register(subscriber)
}
def addSubscriber[A <: AnyRef](subscriber: A): Unit = synchronized {
subscribers.add(subscriber)
asyncBus.register(subscriber)
+ syncBus.register(subscriber)
}
@Subscribe
import _root_.akka.actor._
import cc.spray.can.{ServerConfig, HttpClient, HttpServer}
import gr.grnet.aquarium.util.{Loggable, Lifecycle}
-import gr.grnet.aquarium.{AquariumInternalError, Aquarium}
+import gr.grnet.aquarium.{Configurable, AquariumAwareSkeleton, Aquarium}
+import com.ckkloverdos.props.Props
/**
* REST service based on Actors and Spray.
*
* @author Christos KK Loverdos <loverdos@gmail.com>.
*/
-class RESTActorService extends Lifecycle with Loggable {
+class RESTActorService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
private[this] var _port: Int = 8080
private[this] var _restActor: ActorRef = _
private[this] var _serverActor: ActorRef = _
private[this] var _clientActor: ActorRef = _
- def start(): Unit = {
- val aquarium = Aquarium.Instance
- this._port = aquarium.props.getInt(Aquarium.Keys.rest_port).getOr(
- throw new AquariumInternalError(
- "%s was not specified in Aquarium properties".format(Aquarium.Keys.rest_port)))
- logger.debug("Starting on port %s".format(this._port))
+ def propertyPrefix = Some(RESTActorService.Prefix)
+
+ /**
+ * Configure this instance with the provided properties.
+ *
+ * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
+ */
+ def configure(props: Props) {
+ this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
+ logger.debug("HTTP port is %s".format(this._port))
+ }
+
+ def start(): Unit = {
+ logger.info("Starting HTTP on port %s".format(this._port))
this._restActor = aquarium.actorProvider.actorForRole(RESTRole)
// Start Spray subsystem
}
def stop(): Unit = {
+ logger.info("Stopping HTTP on port %s".format(this._port))
+
this._serverActor.stop()
this._clientActor.stop()
}
+}
+
+object RESTActorService {
+ final val Prefix = "rest"
}
\ No newline at end of file
import com.ckkloverdos.props.Props
import com.google.common.eventbus.Subscribe
-import gr.grnet.aquarium.{Aquarium, Configurable}
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Aquarium, Configurable}
import gr.grnet.aquarium.converter.StdConverters
import gr.grnet.aquarium.actor.RouterRole
import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
import gr.grnet.aquarium.util.sameTags
-import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent}
+import gr.grnet.aquarium.service.event.{AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
import gr.grnet.aquarium.connector.rabbitmq.service.{PayloadHandlerFutureExecutor, PayloadHandlerPostNotifier}
import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys
import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-class RabbitMQService extends Loggable with Lifecycle with Configurable {
+class RabbitMQService extends Loggable with Lifecycle with Configurable with AquariumAwareSkeleton {
@volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
@volatile private[this] var _consumers = List[RabbitMQConsumer]()
def propertyPrefix = Some(RabbitMQKeys.PropertiesPrefix)
- def aquarium = Aquarium.Instance
-
def eventBus = aquarium.eventBus
def resourceEventStore = aquarium.resourceEventStore
*/
def configure(props: Props) = {
this._props = props
+ }
+
+ @Subscribe
+ override def awareOfAquariumEx(event: AquariumCreatedEvent) {
+ super.awareOfAquariumEx(event)
- doConfigure()
+ aquarium.eventBus.addSubscriber(this)
+
+ doSetup()
}
- private[this] def doConfigure(): Unit = {
+ private[this] def doSetup(): Unit = {
val postNotifier = new PayloadHandlerPostNotifier(logger)
val rcHandler = new ResourceEventPayloadHandler(aquarium, logger)
rccc.queueName
))
new RabbitMQConsumer(
+ aquarium,
rccc,
rcHandler,
futureExecutor,
imcc.queueName
))
new RabbitMQConsumer(
+ aquarium,
imcc,
imHandler,
futureExecutor,
}
def start() = {
- aquarium.eventBus.addSubscriber(this)
-
safeStart()
}
import com.ckkloverdos.props.Props
import gr.grnet.aquarium.util.Lifecycle
import gr.grnet.aquarium.actor.ActorRole
+import gr.grnet.aquarium.AquariumAware
/**
*
import com.ckkloverdos.props.Props
import akka.actor.ActorRef
-import gr.grnet.aquarium.Configurable
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable}
import java.util.concurrent.ConcurrentHashMap
import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.util.date.TimeHelpers
import gr.grnet.aquarium.actor.message.config.{AquariumPropertiesLoaded, ActorProviderConfigured}
import gr.grnet.aquarium.actor._
-
/**
* All actors are provided locally.
*
* @author Christos KK Loverdos <loverdos@gmail.com>.
*/
-class SimpleLocalRoleableActorProviderService extends RoleableActorProviderService with Configurable with Loggable {
+class SimpleLocalRoleableActorProviderService
+ extends RoleableActorProviderService
+ with AquariumAwareSkeleton
+ with Configurable
+ with Loggable {
+
private[this] val actorCache = new ConcurrentHashMap[ActorRole, ActorRef]
- private[this] var _props: Props = _
+ @volatile private[this] var _props: Props = _
def propertyPrefix = None
}
private[this] def _newActor(role: ActorRole): ActorRef = {
- val actorRef = akka.actor.Actor.actorOf(role.actorType).start()
+ val actorFactory = (_class: Class[_ <: RoleableActor]) ⇒ {
+ aquarium.newInstance(_class, _class.getName)
+ }
+
+ val actorRef = akka.actor.Actor.actorOf(actorFactory(role.actorType)).start()
val propsMsg = AquariumPropertiesLoaded(this._props)
if(role.canHandleConfigurationMessage(propsMsg)) {
import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle, Tag}
import java.util.concurrent.atomic.AtomicBoolean
import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent}
-import gr.grnet.aquarium.{Configurable, Aquarium}
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, Aquarium}
import com.ckkloverdos.props.Props
import gr.grnet.aquarium.store.StoreProvider
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-final class StoreWatcherService extends Lifecycle with Configurable with Loggable {
+final class StoreWatcherService extends Lifecycle with Configurable with AquariumAwareSkeleton with Loggable {
private[this] var _reconnectPeriodMillis = 1000L
private[this] val _pingIsScheduled = new AtomicBoolean(false)
private[this] val _rcIsAlive = new AtomicBoolean(true)
private[this] val _imIsAlive = new AtomicBoolean(true)
- def aquarium = Aquarium.Instance
-
-
def propertyPrefix = Some(StoreProvider.Prefix)
/**
this._reconnectPeriodMillis = props.getLongEx(StoreProvider.Keys.reconnect_period_millis)
}
- private[this] def safePingStore(tag: Tag,
- pinger: () ⇒ Any,
- getStatus: () ⇒ Boolean,
- setStatus: (Boolean) ⇒ Any): Unit = {
+ private[this] def safePingStore(
+ tag: Tag,
+ pinger: () ⇒ Any,
+ getStatus: () ⇒ Boolean,
+ setStatus: (Boolean) ⇒ Any
+ ): Unit = {
+
try {
val wasAlive = getStatus()
pinger()
}
}
- private[this] def doSchedulePing(tag: Tag,
- info: String,
- pinger: () ⇒ Any,
- getStatus: () ⇒ Boolean,
- setStatus: (Boolean) ⇒ Any): Unit = {
+ private[this] def doSchedulePing(
+ tag: Tag,
+ info: String,
+ pinger: () ⇒ Any,
+ getStatus: () ⇒ Boolean,
+ setStatus: (Boolean) ⇒ Any
+ ): Unit = {
+
aquarium.timerService.scheduleOnce(
info,
{
--- /dev/null
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.service.event
+
+import gr.grnet.aquarium.Aquarium
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+final case class AquariumCreatedEvent(aquarium: Aquarium) extends BusEvent
def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent
def clearResourceEvents(): Unit = {
- // This method is implemented only in MemStore.
+ // This method is implemented only in MemStoreProvider.
throw new AquariumInternalError("Unsupported operation")
}
* @author Georgios Gousios <gousiosg@gmail.com>
*/
-class MemStore extends UserStateStore
+class MemStoreProvider extends UserStateStore
with Configurable with PolicyStore
with ResourceEventStore with IMEventStore
with StoreProvider {
"PolicyEntry" -> _policyEntries.size
)
- "MemStore(%s)" format map
+ "MemStoreProvider(%s)" format map
}
//+ StoreProvider
}
}
-object MemStore {
+object MemStoreProvider {
final def isLocalIMEvent(event: IMEventModel) = event match {
case _: MemIMEvent ⇒ true
case _ ⇒ false
-version = 0.0.2-SNAPSHOT
-
-# Location of the Aquarium accounting policy config file. If commented
-# out, Aquarium will look for the file policy.yaml first at the program
-# starting directory and then fall back to the classpath.
-aquarium.policy=policy.yaml
-
-# Location of the file that defines the mappings between
-# user roles and agreements
-aquarium.role-agreement.map=role-agreement.map
+version = 0.2.0-SNAPSHOT
### Queue related settings
# active-active mode.
rabbitmq.servers=localhost
-# Comma separated list of rabbitmq servers to use. The servers must be in an
-# active-active mode.
+# Port for connecting to the AMQP server
rabbitmq.port=5672
# User name for connecting with the AMQP server
# Passwd for connecting with the AMQP server
rabbitmq.passwd=guest
+# Exchnage used by Aquarium to publish messages
+rabbitmq.exchange=aquarium
+
# Virtual host on the AMQP server
rabbitmq.vhost=/
mongodb.connection.pool.size=20
# Relative to AQUARIUM_HOME or an absolute path
-events.store.folder=../events-store
+# DO NOT set this in production
+#events.store.folder=../events-store
# Store resource events to events.store.folder as well
-events.store.save.rc.events=true
+events.store.save.rc.events=false
# Store IM events to events.store.folder as well
-events.store.save.im.events=true
+events.store.save.im.events=false
# How often do we attempt a reconnection to the store(s)?
anystore.reconnect.period.millis=1000
# Actor subsystem
actor.provider.class=gr.grnet.aquarium.service.SimpleLocalRoleableActorProviderService
-
# Class that initializes the REST service
rest.service.class=gr.grnet.aquarium.service.RESTActorService
-
# Store subsystem
-store.provider.class=gr.grnet.aquarium.store.mongodb.MemStore
-
+store.provider.class=gr.grnet.aquarium.store.mongodb.MongoDBStoreProvider
# Override the user store (if present, it will not be given by the store provider above)
-user.state.store.class=gr.grnet.aquarium.store.memory.MemStore
-
-# Override the event store (if present, it will not be given by the store provider above)
-resource.event.store.class=gr.grnet.aquarium.store.memory.MemStore
-
+#user.state.store.class=gr.grnet.aquarium.store.memory.MemStorede the event store (if present, it will not be given by the store provider above)
+#resource.event.store.class=
# Override the user event store (if present, it will not be given by the store provider above)
-user.event.store.class=gr.grnet.aquarium.store.memory.MemStore
-
+#user.event.store.class=
# Override the user event store (if present, it will not be given by the store provider above)
-policy.store.class=gr.grnet.aquarium.store.memory.MemStore
-
-# The lower mark for the UserActors' LRU.
-user.actor.LRU.lower.mark=800
-
-# The upper mark for the UserActors' LRU.
-user.actors.LRU.upper.mark=1000
-
-# A time period in milliseconds for which we can tolerate stale data regarding user state.
-user.state.timestamp.threshold=10000
-
-# Comma separated list of exchanges known to aquarium
-rabbitmq.exchange=aquarium
-
-# This is an absolute constant for the lifetime of an Aquarium installation.
-# 1 means that every second counts
-time.unit.in.seconds = 1
-
-# Save unparsed user events to user event store
-ack.unparsed.event.im=false
+#policy.store.class=
# Administrative REST API authorization cookie
admin.cookie=1
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.logic.test
-
-import org.junit.Test
-import org.junit.Assert._
-import gr.grnet.aquarium.{StoreConfigurator}
-import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.logic.accounting.Policy
-import java.io.File
-
-/**
- * Tests for the Policy resolution algorithms
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-class PolicyTest extends DSLTestBase with StoreConfigurator {
-
- @Test
- def testReloadPolicies: Unit = {
-
- def copyModifyFile(from: String, to: String) = {
- val extra = " - agreement:\n overrides: default\n name: foobar"
- val out = new java.io.BufferedWriter(new java.io.FileWriter(to) );
- io.Source.fromFile(from).getLines.map(x => x + "\n").foreach(s => out.write(s,0,s.length));
- out.write(extra)
- out.close()
- }
-
- //Initial policy file, read from class path
- Policy.withConfigurator(configurator)
- val pol = Policy.policies.get
-
- /*val f = Policy.policyFile
- assertTrue(f.exists)
-
- //Touch the file to trigger reloading with non changed state
- Thread.sleep(200)
- f.setLastModified(TimeHelpers.nowMillis)
- var polNew = Policy.reloadPolicies
-
- assertEquals(pol.keys.size, polNew.keys.size)
- //assertEquals(pol.keys.head, polNew.keys.head)
-
- //Copy the file and add a new element -> new policy
- val fileCopy = new File(f.getParent, "policy.yaml.old")
- f.renameTo(fileCopy)
- copyModifyFile(fileCopy.getAbsolutePath,
- (new File(fileCopy.getParent, "policy.yaml")).getAbsolutePath)
-
- polNew = Policy.reloadPolicies
- assertEquals(pol.keys.size + 1, polNew.keys.size)
- val policyEffectivities = Policy.policies.get.keySet.toList.sortWith((x,y) => if (y.from after x.from) true else false)
- testSuccessiveTimeslots(policyEffectivities)
- testNoGaps(policyEffectivities)
- */
- }
-
- @Test
- def testLoadStore: Unit = {
- before
-
- val policies = configurator.policyStore
- policies.storePolicyEntry(this.dsl.toPolicyEntry)
-
- val copy1 = this.dsl.copy(algorithms = List())
- policies.storePolicyEntry(copy1.toPolicyEntry)
-
- val copy2 = this.dsl.copy(pricelists = List())
- policies.storePolicyEntry(copy2.toPolicyEntry)
-
- var pol = policies.loadPolicyEntriesAfter(TimeHelpers.nowMillis())
- assert(pol.isEmpty)
-
- pol = policies.loadPolicyEntriesAfter(0)
- assertEquals(3, pol.size)
- assertEquals(pol.head.policyYAML, this.dsl.toYAML)
- assertEquals(pol.tail.head.policyYAML, copy1.toYAML)
- assertEquals(pol.tail.tail.head.policyYAML, copy2.toYAML)
- }
-}
import gr.grnet.aquarium.util.makeString
import gr.grnet.aquarium.converter.StdConverters
import net.liftweb.json.JsonAST.{JValue, JInt}
-import gr.grnet.aquarium.{AquariumException, LogicTestsAssumptions, Aquarium}
+import gr.grnet.aquarium.{ResourceLocator, AquariumBuilder, AquariumException, LogicTestsAssumptions, Aquarium}
/**
*
assumeTrue(LogicTestsAssumptions.EnableSprayTests)
// Initialize configuration subsystem
- val aquarium = Aquarium.Instance
+ val aquarium = new AquariumBuilder(ResourceLocator.AquariumProperties).build()
aquarium.start()
- val port = aquarium.props.getInt(Aquarium.Keys.rest_port).getOr(
- throw new AquariumException("No %s specified in aquarium properties".format(Aquarium.Keys.rest_port)))
+
+ val port = aquarium.restPort
val dialog = SprayHttpDialog("localhost", port)
val pingReq = HttpRequest(method = GET, uri = "/ping", headers = HttpHeader("Content-Type", "text/plain; charset=UTF-8")::Nil)
}
}
- aquarium.stopWithDelay(1000)
+ aquarium.stopAfterMillis(1000)
}
}
\ No newline at end of file
package gr.grnet.aquarium.user
-import gr.grnet.aquarium.store.memory.MemStore
+import gr.grnet.aquarium.store.memory.MemStoreProvider
import gr.grnet.aquarium.logic.accounting.dsl._
import gr.grnet.aquarium.logic.accounting.Policy
import gr.grnet.aquarium.util.{Loggable, ContextualLogger}
import gr.grnet.aquarium.uid.{UIDGenerator, ConcurrentVMLocalUIDGenerator}
import org.junit.{Assert, Ignore, Test}
import gr.grnet.aquarium.logic.accounting.algorithm.{ExecutableCostPolicyAlgorithm, CostPolicyAlgorithmCompiler}
-import gr.grnet.aquarium.AquariumException
-import gr.grnet.aquarium.Aquarium.{Instance ⇒ AquariumInstance}
+import gr.grnet.aquarium.{Aquarium, ResourceLocator, AquariumBuilder, AquariumException}
import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, MonthlyBillingCalculation}
import gr.grnet.aquarium.util.date.MutableDateCalc
import gr.grnet.aquarium.computation.BillingMonthInfo
DiskspacePriceUnit
)
- val aquarium = AquariumInstance.withStoreProviderClass(classOf[MemStore])
- Policy.withConfigurator(aquarium)
- val StoreProvider = aquarium.storeProvider
- val ResourceEventStore = StoreProvider.resourceEventStore
+ val aquarium = new AquariumBuilder(ResourceLocator.AquariumProperties).
+ update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
+ build()
+
+ Policy.withConfigurator(aquarium) // FIXME
+
+ val ResourceEventStore = aquarium.resourceEventStore
val Computations = aquarium.userStateComputations
val policyOccurredMillis = policyDateCalc.toMillis
val policyValidFromMillis = policyDateCalc.copy.goPreviousYear.toMillis
val policyValidToMillis = policyDateCalc.copy.goNextYear.toMillis
- StoreProvider.policyStore.storePolicyEntry(DefaultPolicy.toPolicyEntry(policyOccurredMillis, policyValidFromMillis, policyValidToMillis))
+ aquarium.policyStore.storePolicyEntry(DefaultPolicy.toPolicyEntry(policyOccurredMillis, policyValidFromMillis,
+ policyValidToMillis))
- val Aquarium = AquariumSim(List(VMTimeResourceSim, DiskspaceResourceSim, BandwidthResourceSim), StoreProvider.resourceEventStore)
- val DefaultResourcesMap = Aquarium.resourcesMap
+ val AquariumSim_ = AquariumSim(List(VMTimeResourceSim, DiskspaceResourceSim, BandwidthResourceSim), aquarium.resourceEventStore)
+ val DefaultResourcesMap = AquariumSim_.resourcesMap
- val UserCKKL = Aquarium.newUser("CKKL", UserCreationDate)
+ val UserCKKL = AquariumSim_.newUser("CKKL", UserCreationDate)
// val InitialUserState = UserState.createInitialUserState(
// userID = UserCKKL.userID,