Refactor Aquarium to make it more configurable
authorChristos KK Loverdos <loverdos@gmail.com>
Fri, 22 Jun 2012 13:07:56 +0000 (16:07 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Fri, 22 Jun 2012 13:07:56 +0000 (16:07 +0300)
- No Aquarium singleton any more.
- Aquarium is bootstrapped and configured using a builder.
- Services are wired a bit differently right now. Needs more testing, in
order to reach previous runtime stability.

30 files changed:
pom.xml
src/main/resources/aquarium.properties
src/main/scala/gr/grnet/aquarium/Aquarium.scala
src/main/scala/gr/grnet/aquarium/AquariumAware.scala [moved from src/test/scala/gr/grnet/aquarium/logic/test/RoleAgreementsTest.scala with 54% similarity]
src/main/scala/gr/grnet/aquarium/AquariumAwareSkeleton.scala [moved from src/test/scala/gr/grnet/aquarium/StoreConfigurator.scala with 69% similarity]
src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/Main.scala
src/main/scala/gr/grnet/aquarium/ResourceLocator.scala
src/main/scala/gr/grnet/aquarium/actor/RoleableActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/rest/RESTActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala
src/main/scala/gr/grnet/aquarium/computation/state/parts/RoleHistoryItem.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/Policy.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/RoleAgreements.scala [deleted file]
src/main/scala/gr/grnet/aquarium/service/AkkaService.scala
src/main/scala/gr/grnet/aquarium/service/EventBusService.scala
src/main/scala/gr/grnet/aquarium/service/RESTActorService.scala
src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala
src/main/scala/gr/grnet/aquarium/service/RoleableActorProviderService.scala
src/main/scala/gr/grnet/aquarium/service/SimpleLocalRoleableActorProviderService.scala
src/main/scala/gr/grnet/aquarium/service/StoreWatcherService.scala
src/main/scala/gr/grnet/aquarium/service/event/AquariumCreatedEvent.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala
src/main/scala/gr/grnet/aquarium/store/memory/MemStoreProvider.scala [moved from src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala with 98% similarity]
src/test/resources/aquarium.properties
src/test/scala/gr/grnet/aquarium/logic/test/PolicyTest.scala [deleted file]
src/test/scala/gr/grnet/aquarium/rest/actor/RESTActorTest.scala
src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala

diff --git a/pom.xml b/pom.xml
index fc3b07b..2c24817 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -40,7 +40,7 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>gr.grnet</groupId>
   <artifactId>aquarium</artifactId>
-  <version>0.2-SNAPSHOT</version>
+  <version>0.2.0-SNAPSHOT</version>
 
   <!-- Project details-->
   <name>Aquarium</name>
index eab7427..def2921 100644 (file)
@@ -1,13 +1,4 @@
-version = 0.0.2-SNAPSHOT
-
-# Location of the Aquarium accounting policy config file. If commented
-# out, Aquarium will look for the file policy.yaml first at the program
-# starting directory and then fall back to the classpath.
-aquarium.policy = policy.yaml
-
-# Location of the file that defines the mappings between
-# user roles and agreements
-aquarium.role-agreement.map=role-agreement.map
+version = 0.2.0-SNAPSHOT
 
 ### Queue related settings
 
@@ -27,6 +18,9 @@ rabbitmq.username=guest
 # Passwd for connecting with the AMQP server
 rabbitmq.passwd=guest
 
+# Exchnage used by Aquarium to publish messages
+rabbitmq.exchange=aquarium
+
 # Virtual host on the AMQP server
 rabbitmq.vhost=/
 
@@ -94,19 +88,5 @@ store.provider.class=gr.grnet.aquarium.store.mongodb.MongoDBStoreProvider
 # Override the user event store (if present, it will not be given by the store provider above)
 #policy.store.class=
 
-# The lower mark for the UserActors' LRU.
-user.actor.LRU.lower.mark=800
-# The upper mark for the UserActors' LRU.
-user.actors.LRU.upper.mark=1000
-
-# A time period in milliseconds for which we can tolerate stale data regarding user state.
-user.state.timestamp.threshold=10000
-
-# Exchnage used by Aquarium to publish messages
-rabbitmq.exchange=aquarium
-
-# Save unparsed user events to user event store
-ack.unparsed.event.im=false
-
 # Administrative REST API authorization cookie
 admin.cookie=1
\ No newline at end of file
index 52a3749..68941bf 100644 (file)
 
 package gr.grnet.aquarium
 
-import java.io.File
-
-import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
-import com.ckkloverdos.maybe._
+import com.ckkloverdos.env.Env
+import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
 import com.ckkloverdos.props.Props
-import com.ckkloverdos.sys.SysProp
-
-import gr.grnet.aquarium.util.{Lifecycle, Loggable, shortNameOfClass, shortClassNameOf}
-import gr.grnet.aquarium.store._
-import gr.grnet.aquarium.service._
-import gr.grnet.aquarium.converter.StdConverters
+import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, IMEventStore, ResourceEventStore, StoreProvider}
+import java.io.File
+import gr.grnet.aquarium.util.{Loggable, Lifecycle}
+import gr.grnet.aquarium.service.{RoleableActorProviderService, StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
+import com.ckkloverdos.convert.Converters
 import java.util.concurrent.atomic.AtomicBoolean
-import gr.grnet.aquarium.ResourceLocator._
+import org.slf4j.{LoggerFactory, Logger}
+import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
 import gr.grnet.aquarium.computation.UserStateComputations
-import gr.grnet.aquarium.logic.accounting.algorithm.{SimpleCostPolicyAlgorithmCompiler, CostPolicyAlgorithmCompiler}
+import com.ckkloverdos.maybe._
+import gr.grnet.aquarium.ResourceLocator._
 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
 import gr.grnet.aquarium.logic.accounting.Policy
-import org.slf4j.{Logger, LoggerFactory}
+import com.ckkloverdos.sys.SysProp
 
 /**
- * This is the Aquarium entry point.
- *
- * Responsible to load all of application configuration and provide the relevant services.
  *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
+ * @author Christos KK Loverdos <loverdos@gmail.com>
  */
-final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariumSelf ⇒
-  import Aquarium.Keys
+
+final class Aquarium(env: Env) extends Lifecycle with Loggable {
+  import Aquarium.EnvKeys
 
   private[this] val _isStopping = new AtomicBoolean(false)
 
@@ -91,199 +88,17 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariu
     getClientLogger(client).warn(fmt.format(args: _*))
   }
 
-  /**
-   * Reflectively provide a new instance of a class and configure it appropriately.
-   */
-  private[this] def newInstance[C : Manifest](_className: String = ""): C = {
-    val className = _className match {
-      case "" ⇒
-        manifest[C].erasure.getName
-
-      case name ⇒
-        name
-    }
-
-    val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
-    instanceM match {
-      case Just(instance) ⇒ instance match {
-        case configurable: Configurable ⇒
-          val localProps = configurable.propertyPrefix match {
-            case Some(prefix) ⇒
-              props.subsetForKeyPrefix(prefix)
-
-            case None ⇒
-              props
-          }
-
-          logger.debug("Configuring {} with props", configurable.getClass.getName)
-          MaybeEither(configurable configure localProps) match {
-            case Just(_) ⇒
-              logger.info("Configured {} with props", configurable.getClass.getName)
-              instance
-
-            case Failed(e) ⇒
-              throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
-          }
-
-        case _ ⇒
-          instance
-      }
-
-      case Failed(e) ⇒
-        throw new AquariumInternalError("Could not instantiate %s".format(className), e)
-    }
-
-  }
-
-  private[this] lazy val _algorithmCompiler: CostPolicyAlgorithmCompiler = SimpleCostPolicyAlgorithmCompiler
-
-  private[this] lazy val _userStateComputations = new UserStateComputations(aquariumSelf)
-
-  private[this] lazy val _actorProvider = newInstance[RoleableActorProviderService](props(Keys.actor_provider_class))
-
-  /**
-   * Initializes a store provider, according to the value configured
-   * in the configuration file. The
-   */
-  private[this] lazy val _storeProvider = newInstance[StoreProvider](props(Keys.store_provider_class))
-  
-  private[this] lazy val _restService = newInstance[Lifecycle](props(Keys.rest_service_class))
-
-  private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
-    // If there is a specific `UserStateStore` implementation specified in the
-    // properties, then this implementation overrides the user store given by
-    // `StoreProvider`.
-    props.get(Keys.user_state_store_class) map { className ⇒
-      val instance = newInstance[UserStateStore](className)
-      logger.info("Overriding %s provisioning. Implementation given by: %s".format(
-        shortNameOfClass(classOf[UserStateStore]),
-        instance.getClass))
-      instance
-    }
-  }
-
-  private[this] lazy val _resourceEventStoreM: Maybe[ResourceEventStore] = {
-    // If there is a specific `EventStore` implementation specified in the
-    // properties, then this implementation overrides the event store given by
-    // `StoreProvider`.
-    props.get(Keys.resource_event_store_class) map { className ⇒
-      val instance = newInstance[ResourceEventStore](className)
-      logger.info("Overriding EventStore provisioning. Implementation given by: %s".format(instance.getClass))
-      instance
-    }
-  }
-
-  private[this] lazy val _imEventStoreM: Maybe[IMEventStore] = {
-    props.get(Keys.user_event_store_class) map { className ⇒
-      val instance = newInstance[IMEventStore](className)
-      logger.info("Overriding IMEventStore provisioning. Implementation given by: %s".format(instance.getClass))
-      instance
-    }
-  }
-
-  private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
-    props.get(Keys.policy_store_class) map {
-      className ⇒
-        val instance = newInstance[PolicyStore](className)
-        logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass))
-        instance
-    }
-  }
-
-  private[this] lazy val _eventsStoreFolder: Maybe[File] = {
-    props.get(Keys.events_store_folder) map {
-      folderName ⇒
-        logger.info("{} = {}", Keys.events_store_folder, folderName)
-        
-        val canonicalFolder = {
-          val folder = new File(folderName)
-          if(folder.isAbsolute) {
-            folder.getCanonicalFile
-          } else {
-            logger.info("{} is not absolute, making it relative to Aquarium Home", Keys.events_store_folder)
-            new File(ResourceLocator.Homes.Folders.AquariumHome, folderName).getCanonicalFile
-          }
-        }
-
-        val canonicalPath = canonicalFolder.getCanonicalPath
-
-        if(canonicalFolder.exists() && !canonicalFolder.isDirectory) {
-          throw new AquariumInternalError("%s = %s is not a folder".format(Keys.events_store_folder, canonicalFolder))
-        }
-
-        // Now, events folder must be outside AQUARIUM_HOME, since AQUARIUM_HOME can be wiped out for an upgrade but
-        // we still want to keep the events.
-        val ahCanonicalPath = ResourceLocator.Homes.Folders.AquariumHome.getCanonicalPath
-        if(canonicalPath.startsWith(ahCanonicalPath)) {
-          throw new AquariumException(
-            "%s = %s is under Aquarium Home = %s".format(
-              Keys.events_store_folder,
-              canonicalFolder,
-              ahCanonicalPath
-            ))
-        }
-
-        canonicalFolder.mkdirs()
-
-        canonicalFolder
+  @throws(classOf[AquariumInternalError])
+  def apply[T: Manifest](key: TypedKey[T]): T = {
+    try {
+     env.getEx(key)
+    } catch {
+      case e: Exception ⇒
+        throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
     }
   }
 
-  private[this] lazy val _events_store_save_rc_events = props.getBoolean(Keys.events_store_save_rc_events).getOr(false)
-
-  private[this] lazy val _events_store_save_im_events = props.getBoolean(Keys.events_store_save_im_events).getOr(false)
-
-  private[this] lazy val _converters = StdConverters.AllConverters
-
-  private[this] lazy val _timerService: TimerService = newInstance[SimpleTimerService]()
-
-  private[this] lazy val _akka = newInstance[AkkaService]()
-
-  private[this] lazy val _eventBus = newInstance[EventBusService]()
-
-  private[this] lazy val _rabbitmqService = newInstance[RabbitMQService]()
-
-  private[this] lazy val _storeWatcherService = newInstance[StoreWatcherService]()
-
-  private[this] lazy val _allServices = List(
-    _timerService,
-    _akka,
-    _actorProvider,
-    _eventBus,
-    _restService,
-    _rabbitmqService,
-    _storeWatcherService
-  )
-
-  def get(key: String, default: String = ""): String = props.getOr(key, default)
-
-  def defaultClassLoader = Thread.currentThread().getContextClassLoader
-
-  /**
-   * FIXME: This must be ditched.
-   * 
-   * Find a file whose location can be overiden in
-   * the configuration file (e.g. policy.yaml)
-   *
-   * @param name Name of the file to search for
-   * @param prop Name of the property that defines the file path
-   * @param default Name to return if no file is found
-   */
-  def findConfigFile(name: String, prop: String, default: String): File = {
-    // Check for the configured value first
-    val configured = new File(get(prop))
-    if (configured.exists)
-      return configured
-
-    // Look into the configuration context
-    ResourceLocator.getResource(name) match {
-      case Just(policyResource) ⇒
-        val path = policyResource.url.getPath
-        new File(path)
-      case _ ⇒
-        new File(default)
-    }
-  }
+  private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this(_))
 
   private[this] def startServices(): Unit = {
     for(service ← _allServices) {
@@ -303,12 +118,7 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariu
     }
   }
 
-  def stopWithDelay(millis: Long) {
-    Thread sleep millis
-    stop()
-  }
-
-  private[this] def configure(): Unit = {
+  private[this] def showBasicConfiguration(): Unit = {
     logger.info("Aquarium Home = %s".format(
       if(Homes.Folders.AquariumHome.isAbsolute)
         Homes.Folders.AquariumHome
@@ -317,7 +127,7 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariu
     ))
 
     for(folder ← this.eventsStoreFolder) {
-      logger.info("{} = {}", Aquarium.Keys.events_store_folder, folder)
+      logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
     }
     this.eventsStoreFolder.throwMe // on error
 
@@ -343,59 +153,71 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariu
     }))
   }
 
-  def start() = {
+  def start(): Unit = {
     this._isStopping.set(false)
-    configure()
+    showBasicConfiguration()
     addShutdownHooks()
     startServices()
   }
 
-  def stop() = {
+  def stop(): Unit = {
     this._isStopping.set(true)
     stopServices()
   }
 
-  def algorithmCompiler = _algorithmCompiler
-
-  def userStateComputations = _userStateComputations
-
-  def converters = _converters
-  
-  def actorProvider = _actorProvider
-
-  def eventBus = _eventBus
-
-  def timerService = _timerService
-
-  def userStateStore = {
-    _userStateStoreM match {
-      case Just(us) ⇒ us
-      case _        ⇒ storeProvider.userStateStore
-    }
+  /**
+   * Stops Aquarium after the given millis. Used during testing.
+   */
+  def stopAfterMillis(millis: Long) {
+    Thread sleep millis
+    stop()
   }
 
-  def resourceEventStore = {
-    _resourceEventStoreM match {
-      case Just(es) ⇒ es
-      case _        ⇒ storeProvider.resourceEventStore
-    }
-  }
+  /**
+   * Reflectively provide a new instance of a class and configure it appropriately.
+   */
+  def newInstance[C <: AnyRef](_class: Class[C], className: String): C = {
+    val originalProps = apply(EnvKeys.originalProps)
 
-  def imEventStore = {
-    _imEventStoreM match {
-      case Just(es) ⇒ es
-      case _        ⇒ storeProvider.imEventStore
-    }
-  }
+    val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
+    instanceM match {
+      case Just(instance) ⇒
+        eventBus.addSubscriber[C](instance)
+
+        instance match {
+          case configurable: Configurable if (originalProps ne null) ⇒
+            val localProps = configurable.propertyPrefix match {
+              case somePrefix @ Some(prefix) ⇒
+                if(prefix.length == 0) {
+                  logger.warn(
+                    "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
+                }
+
+                originalProps.subsetForKeyPrefix(prefix)
+
+              case None ⇒
+                originalProps
+            }
+
+            logger.debug("Configuring {} with props", configurable.getClass.getName)
+            MaybeEither(configurable configure localProps) match {
+              case Just(_) ⇒
+                logger.info("Configured {} with props", configurable.getClass.getName)
+                instance
+
+              case Failed(e) ⇒
+                throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
+            }
+
+          case _ ⇒
+            instance
+        }
 
-  def policyStore = {
-    _policyStoreM match {
-      case Just(es) ⇒ es
-      case _        ⇒ storeProvider.policyStore
+      case Failed(e) ⇒
+        throw new AquariumInternalError("Could not instantiate %s".format(className), e)
     }
-  }
 
-  def storeProvider = _storeProvider
+  }
 
   def currentResourcesMap: DSLResourcesMap = {
     // FIXME: Get rid of this singleton stuff
@@ -416,24 +238,42 @@ final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariu
     // FIXME: Read from properties?
     "default"
   }
-  
-  def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Aquarium = {
-    val map = this.props.map
-    val newMap = map.updated(Keys.store_provider_class, spc.getName)
-    val newProps = new Props(newMap)
-    new Aquarium(newProps)
-  }
 
-  def eventsStoreFolder = _eventsStoreFolder
+  def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
 
-  def saveResourceEventsToEventsStoreFolder = _events_store_save_rc_events
+  def resourceEventStore = apply(EnvKeys.resourceEventStore)
 
-  def saveIMEventsToEventsStoreFolder = _events_store_save_im_events
+  def imEventStore = apply(EnvKeys.imEventStore)
 
-  def adminCookie: MaybeOption[String] = props.get(Aquarium.Keys.admin_cookie) match {
-    case just @ Just(_) ⇒ just
-    case _ ⇒ NoVal
-  }
+  def userStateStore = apply(EnvKeys.userStateStore)
+
+  def policyStore = apply(EnvKeys.policyStore)
+
+  def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
+
+  def algorithmCompiler = apply(EnvKeys.algorithmCompiler)
+
+  def eventBus = apply(EnvKeys.eventBus)
+
+  def userStateComputations = apply(EnvKeys.userStateComputations)
+
+  def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
+
+  def adminCookie = apply(EnvKeys.adminCookie)
+
+  def converters = apply(EnvKeys.converters)
+
+  def actorProvider = apply(EnvKeys.actorProvider)
+
+  def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
+
+  def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
+
+  def timerService = apply(EnvKeys.timerService)
+
+  def restPort = apply(EnvKeys.restPort)
+
+  def version = apply(EnvKeys.version)
 }
 
 object Aquarium {
@@ -451,137 +291,68 @@ object Aquarium {
     SysProp.FileEncoding
   )
 
-  implicit val DefaultConverters = TheDefaultConverters
-
-  final val PolicyConfName = ResourceLocator.ResourceNames.POLICY_YAML
-
-  final val RolesAgreementsName = ResourceLocator.ResourceNames.ROLE_AGREEMENTS_MAP
-
-  final lazy val AquariumPropertiesResource = ResourceLocator.Resources.AquariumPropertiesResource
-
-  final lazy val AquariumProperties = {
-    val maybeProps = Props(AquariumPropertiesResource)
-    maybeProps match {
-      case Just(props) ⇒
-        props
-
-      case NoVal ⇒
-        throw new AquariumInternalError(
-          "Could not load %s from %s".format(
-            ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
-            AquariumPropertiesResource))
-
-
-      case Failed(e) ⇒
-        throw new AquariumInternalError(
-          "Could not load %s from %s".format(
-            ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
-            AquariumPropertiesResource),
-          e)
-    }
-  }
-
-  /**
-   * The main [[gr.grnet.aquarium.Aquarium]] instance.
-   */
-  final lazy val Instance = {
-    Maybe(new Aquarium(AquariumProperties)) match {
-      case Just(masterConf) ⇒
-        masterConf
-
-      case NoVal ⇒
-        throw new AquariumInternalError(
-          "Could not create Aquarium configuration from %s".format(
-            AquariumPropertiesResource))
-
-      case Failed(e) ⇒
-        throw new AquariumInternalError(
-          "Could not create Aquarium configuration from %s".format(
-            AquariumPropertiesResource),
-          e)
-    }
-  }
-
-  /**
-   * Defines the names of all the known keys inside the master properties file.
-   */
-  final object Keys {
+  object HTTP {
+   final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
+   final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
+ }
+
+  final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
+    override def toString = name
+  }
+
+  final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
+    EnvKeys.timerService,
+    EnvKeys.akkaService,
+    EnvKeys.actorProvider,
+    EnvKeys.eventBus,
+    EnvKeys.restService,
+    EnvKeys.rabbitMQService,
+    EnvKeys.storeWatcherService
+  )
 
+  object EnvKeys {
     /**
      * The Aquarium version. Will be reported in any due occasion.
      */
-    final val version = "version"
+    final val version = StringKey("version")
 
-    /**
-     * The fully qualified name of the class that implements the `RoleableActorProviderService`.
-     * Will be instantiated reflectively and should have a public default constructor.
-     */
-    final val actor_provider_class = "actor.provider.class"
-
-    /**
-     * The class that initializes the REST service
-     */
-    final val rest_service_class = "rest.service.class"
+    final val originalProps: TypedKey[Props] =
+      new AquariumEnvKey[Props]("originalProps")
 
     /**
      * The fully qualified name of the class that implements the `StoreProvider`.
      * Will be instantiated reflectively and should have a public default constructor.
      */
-    final val store_provider_class = "store.provider.class"
-
-    /**
-     * The class that implements the User store
-     */
-    final val user_state_store_class = "user.state.store.class"
+    final val storeProvider: TypedKey[StoreProvider] =
+      new AquariumEnvKey[StoreProvider]("store.provider.class")
 
     /**
-     * The class that implements the resource event store
-     */
-    final val resource_event_store_class = "resource.event.store.class"
-
-    /**
-     * The class that implements the IM event store
-     */
-    final val user_event_store_class = "user.event.store.class"
-
-    /**
-     * The class that implements the wallet entries store
-     */
-    final val policy_store_class = "policy.store.class"
-
-
-    /** The lower mark for the UserActors' LRU.
-     *
-     * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
+     * If a value is given to this property, then it represents a folder where all events coming to aquarium are
+     * saved.
      *
+     * This is for debugging purposes.
      */
-    final val user_actors_lru_lower_mark = "user.actors.LRU.lower.mark"
+    final val eventsStoreFolder: TypedKey[Option[File]] =
+      new AquariumEnvKey[Option[File]]("events.store.folder")
 
     /**
-     * The upper mark for the UserActors' LRU.
+     * If this is `true` and `events.store.folder` is defined, then all resource events are
+     * also stored in `events.store.folder`.
      *
-     * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
+     * This is for debugging purposes.
      */
-    final val user_actors_lru_upper_mark = "user.actors.LRU.upper.mark"
 
-    /**
-     * REST service listening port.
-     *
-     * Default is 8080.
-     */
-    final val rest_port = "rest.port"
+    final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
 
     /**
-     * Location of the Aquarium accounting policy config file
+     * If this is `true` and `events.store.folder` is defined, then all IM events are
+     * also stored in `events.store.folder`.
+     *
+     * This is for debugging purposes.
      */
-    final val aquarium_policy = "aquarium.policy"
+    final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
 
     /**
-     * Location of the role-agreement mapping file
-     */
-    final val aquarium_role_agreement_map = "aquarium.role-agreement.map"
-    
-    /**
      * A time period in milliseconds for which we can tolerate stale parts regarding user state.
      *
      * The smaller the value, the more accurate the user credits and other state parts are.
@@ -590,56 +361,73 @@ object Aquarium {
      * the timestamp of the last known balance amount by this value, then a re-computation for
      * the balance is triggered.
      */
-    final val user_state_timestamp_threshold = "user.state.timestamp.threshold"
+    final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
 
     /**
-     * The time unit is the lowest billable time period.
-     * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine
-     * seconds, then the user will be billed for ten seconds.
+     * REST service listening port.
      *
-     * This is an overall constant. We use it as a property in order to prepare ourselves for
-     * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud
-     * infrastructures.
+     * Default is 8080.
      */
-    final val time_unit_in_millis = "time.unit.in.seconds"
+    final val restPort = IntKey("rest.port")
 
     /**
-     * If a value is given to this property, then it represents a folder where all events coming to aquarium are
-     * saved.
+     * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
+     * an authorised client.
      */
-    final val events_store_folder = "events.store.folder"
+    final val adminCookie: TypedKey[Option[String]] =
+      new AquariumEnvKey[Option[String]]("admin.cookie")
 
-    /**
-     * If this is `true` and `events.store.folder` is defined, then all resource events are
-     * also stored in `events.store.folder`.
-     *
-     * This is for debugging purposes.
-     */
-    final val events_store_save_rc_events = "events.store.save.rc.events"
+    final val resourceEventStore: TypedKey[ResourceEventStore] =
+      new AquariumEnvKey[ResourceEventStore]("resource.event.store.class")
 
-    /**
-     * If this is `true` and `events.store.folder` is defined, then all IM events are
-     * also stored in `events.store.folder`.
-     *
-     * This is for debugging purposes.
-     */
-    final val events_store_save_im_events = "events.store.save.im.events"
+    final val imEventStore: TypedKey[IMEventStore] =
+      new AquariumEnvKey[IMEventStore]("im.event.store.class")
+
+    final val userStateStore: TypedKey[UserStateStore] =
+      new AquariumEnvKey[UserStateStore]("user.state.store.class")
+
+    final val policyStore: TypedKey[PolicyStore] =
+      new AquariumEnvKey[PolicyStore]("policy.store.class")
 
     /**
-     * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.event.model.im.IMEventModel]] is
-     * saved to the [[gr.grnet.aquarium.store.IMEventStore]].
+     * The class that initializes the REST service
      */
-    final val save_unparsed_event_im = "save.unparsed.event.im"
+    final val restService: TypedKey[Lifecycle] =
+      new AquariumEnvKey[Lifecycle]("rest.service.class")
 
     /**
-     * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
-     * an authorised client.
+     * The fully qualified name of the class that implements the `RoleableActorProviderService`.
+     * Will be instantiated reflectively and should have a public default constructor.
      */
-    final val admin_cookie = "admin.cookie"
-  }
+    final val actorProvider: TypedKey[RoleableActorProviderService] =
+      new AquariumEnvKey[RoleableActorProviderService]("actor.provider.class")
+
+    final val akkaService: TypedKey[AkkaService] =
+      new AquariumEnvKey[AkkaService]("akka.service")
+
+    final val eventBus: TypedKey[EventBusService] =
+      new AquariumEnvKey[EventBusService]("event.bus.service")
+
+    final val timerService: TypedKey[TimerService] =
+      new AquariumEnvKey[TimerService]("timer.service")
+
+    final val rabbitMQService: TypedKey[RabbitMQService] =
+      new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
+
+    final val storeWatcherService: TypedKey[StoreWatcherService] =
+      new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
+
+    final val converters: TypedKey[Converters] =
+      new AquariumEnvKey[Converters]("converters")
+
+    final val algorithmCompiler: TypedKey[CostPolicyAlgorithmCompiler] =
+      new AquariumEnvKey[CostPolicyAlgorithmCompiler]("algorithm.compiler")
+
+    final val userStateComputations: TypedKey[UserStateComputations] =
+      new AquariumEnvKey[UserStateComputations]("user.state.computations")
+
+    final val defaultClassLoader: TypedKey[ClassLoader] =
+      new AquariumEnvKey[ClassLoader]("default.class.loader")
 
-  object HTTP {
-    final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
-    final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
   }
 }
  * or implied, of GRNET S.A.
  */
 
-package gr.grnet.aquarium.logic.test
-
-import org.junit.Assert._
-import org.junit.{Test}
-import io.Source
-import gr.grnet.aquarium.util.TestMethods
-import gr.grnet.aquarium.logic.accounting.{Policy, RoleAgreements}
+package gr.grnet.aquarium
 
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
 
 /**
- * Tests for the [[gr.grnet.aquarium.logic.accounting.RoleAgreements]] class
  *
- * @author Georgios Gousios <gousiosg@gmail.com>
+ * @author Christos KK Loverdos <loverdos@gmail.com>
  */
-class RoleAgreementsTest extends TestMethods {
-
-  @Test
-  def testParseMappings {
-
-    var a = """
-
-    # Some useless comment
-student=foobar # This should be ignored (no default policy)
-prof=default
-    name=default
-%asd=default  # This should be ignored (non accepted char)
-*=default
-      """
-
-    var src = Source.fromBytes(a.getBytes())
-    var output = RoleAgreements.parseMappings(src)
-
-    assertEquals(3, output.size)
-    assertEquals("default", output.getOrElse("prof",null).name)
-
-    // No default value
-    a = """
-    prof=default
-    """
-    src = Source.fromBytes(a.getBytes())
-    assertThrows[RuntimeException](RoleAgreements.parseMappings(src))
-  }
-
-  @Test
-  def testLoadMappings {
-    // Uses the roles-agreements.map file in test/resources
-    RoleAgreements.loadMappings
-
-    assertEquals("default", RoleAgreements.agreementForRole("student").name)
 
-    // Check that default policies are applied
-    assertEquals("default", RoleAgreements.agreementForRole("foobar").name)
-  }
+trait AquariumAware {
+  def awareOfAquariumEx(event: AquariumCreatedEvent): Unit
 }
 
 package gr.grnet.aquarium
 
-import store.memory.MemStore
-import store.mongodb.MongoDBStoreProvider
-import util.Loggable
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+import com.google.common.eventbus.Subscribe
 
 /**
- * Returns the appropriate Configurator implementation depending on value
- * of the test.store runtime parameter.
  *
- * @author Georgios Gousios <gousiosg@gmail.com>
+ * @author Christos KK Loverdos <loverdos@gmail.com>
  */
-trait StoreConfigurator extends Loggable {
 
-  def configurator: Aquarium =
-    LogicTestsAssumptions.propertyValue(PropertyNames.TestStore) match {
-      case "mem" => Aquarium.Instance.withStoreProviderClass(classOf[MemStore])
-      case "mongo" => Aquarium.Instance.withStoreProviderClass(classOf[MongoDBStoreProvider])
-      case _ =>
-        logger.warn("Unknown store type, defaulting to \"mem\"")
-        Aquarium.Instance.withStoreProviderClass(classOf[MemStore])
+trait AquariumAwareSkeleton extends AquariumAware {
+  @volatile protected var _aquarium: Aquarium = null
+
+  final def aquarium = _aquarium
+
+  @Subscribe
+  def awareOfAquariumEx(event: AquariumCreatedEvent) = {
+    this._aquarium = event.aquarium
   }
 }
diff --git a/src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala b/src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala
new file mode 100644 (file)
index 0000000..d135648
--- /dev/null
@@ -0,0 +1,415 @@
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium
+
+import com.ckkloverdos.key.{BooleanKey, TypedKey}
+import com.ckkloverdos.env.Env
+import com.ckkloverdos.props.Props
+import com.ckkloverdos.maybe.{MaybeOption, Failed, MaybeEither, Just, NoVal}
+import gr.grnet.aquarium.util.Loggable
+import java.io.File
+import gr.grnet.aquarium.store.StoreProvider
+import gr.grnet.aquarium.logic.accounting.algorithm.SimpleCostPolicyAlgorithmCompiler
+import gr.grnet.aquarium.computation.UserStateComputations
+import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, AkkaService, SimpleTimerService, EventBusService}
+import gr.grnet.aquarium.converter.StdConverters
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+
+/**
+ * Create a tailored Aquarium.
+ *
+ * Thread-unsafe.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+final class AquariumBuilder(val originalProps: Props) extends Loggable {
+  if(originalProps eq null) {
+    throw new AquariumInternalError("props is null")
+  }
+
+  import Aquarium.EnvKeys
+
+  private[this] var _env = Env()
+  // This is special
+  private[this] val eventBus = new EventBusService
+
+  @volatile
+  private[this] var _aquarium: Aquarium = _
+
+  @throws(classOf[AquariumInternalError])
+  private def propsGetEx(key: String): String = {
+    try {
+     originalProps.getEx(key)
+    } catch {
+      case e: Exception ⇒
+        throw new AquariumInternalError("Could not locate %s in Aquarium properties".format(key))
+    }
+  }
+
+  @throws(classOf[AquariumInternalError])
+  private def envGetEx[T: Manifest](key: TypedKey[T]): T = {
+    try {
+     _env.getEx(key)
+    } catch {
+      case e: Exception ⇒
+        throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
+    }
+  }
+
+  def update[T: Manifest](keyvalue: (TypedKey[T], T)): this.type = {
+    assert(keyvalue ne null, "keyvalue ne null")
+
+    _env += keyvalue
+    this
+  }
+
+  def update[T : Manifest](key: TypedKey[T], value: T): this.type = {
+    assert(key ne null, "key ne null")
+
+    this update (key -> value)
+  }
+
+  /**
+   * Reflectively provide a new instance of a class and configure it appropriately.
+   */
+  private[this] def newInstance[C <: AnyRef](manifest: Manifest[C], className: String): C = {
+    val defaultClassLoader = Thread.currentThread().getContextClassLoader
+    val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
+    instanceM match {
+      case Just(instance) ⇒
+        eventBus.addSubscriber(instance)
+
+        instance match {
+          case configurable: Configurable if (originalProps ne null) ⇒
+            val localProps = configurable.propertyPrefix match {
+              case somePrefix @ Some(prefix) ⇒
+                if(prefix.length == 0) {
+                  logger.warn(
+                    "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
+                }
+
+                originalProps.subsetForKeyPrefix(prefix)
+
+              case None ⇒
+                originalProps
+            }
+
+            logger.debug("Configuring {} with props", configurable.getClass.getName)
+            MaybeEither(configurable configure localProps) match {
+              case Just(_) ⇒
+                logger.info("Configured {} with props", configurable.getClass.getName)
+                instance
+
+              case Failed(e) ⇒
+                throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
+            }
+
+          case _ ⇒
+            instance
+        }
+
+      case Failed(e) ⇒
+        throw new AquariumInternalError("Could not instantiate %s".format(className), e)
+    }
+  }
+
+  private[this] def checkStoreProviderOverride: Unit = {
+    val envKey = EnvKeys.storeProvider
+    if(_env.contains(envKey)) {
+      return
+    }
+
+    if(originalProps eq null) {
+      throw new AquariumInternalError("Cannot locate store provider, since no properties have been defined")
+    }
+
+    val propName = envKey.name
+    originalProps.get(propName) match {
+      case Just(propValue) ⇒
+        update(envKey, newInstance(envKey.keyType, propValue))
+
+      case NoVal ⇒
+        throw new AquariumInternalError("No store provider is given in properties")
+
+      case Failed(e) ⇒
+        throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName))
+    }
+  }
+
+  private[this] def checkStoreOverrides: Unit = {
+    if(originalProps eq null) {
+      return
+    }
+
+    def checkOverride[S <: AnyRef : Manifest](envKey: TypedKey[S], f: StoreProvider ⇒ S): Unit = {
+      if(!_env.contains(envKey)) {
+        val propName = envKey.name
+
+        originalProps.get(propName) match {
+          case Just(propValue) ⇒
+            // Create the store reflectively
+            update(envKey, newInstance(envKey.keyType, propValue))
+
+          case NoVal ⇒
+            // Get the store from the store provider
+            val storeProvider = this.envGetEx(EnvKeys.storeProvider)
+            val propValue = f(storeProvider)
+            update(envKey, propValue)
+
+          case Failed(e) ⇒
+            throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName))
+        }
+      }
+    }
+
+    // If a store has not been specifically overridden, we load it from the properties
+    checkOverride(EnvKeys.resourceEventStore, _.resourceEventStore)
+    checkOverride(EnvKeys.imEventStore,       _.imEventStore)
+    checkOverride(EnvKeys.userStateStore,     _.userStateStore)
+    checkOverride(EnvKeys.policyStore,        _.policyStore)
+  }
+
+  private[this] def checkEventsStoreFolderOverride: Unit = {
+    val propName = EnvKeys.eventsStoreFolder.name
+
+    _env.get(EnvKeys.eventsStoreFolder) match {
+      case Just(storeFolderOption) ⇒
+        // Some value has been set, even a None, so do nothing more
+        logger.info("{} = {}", propName, storeFolderOption)
+
+      case Failed(e) ⇒
+        throw new AquariumInternalError(e, "While obtaining value for env key %s".format(propName))
+
+      case NoVal ⇒
+        if(originalProps eq null) {
+          update(EnvKeys.eventsStoreFolder, None)
+          return
+        }
+
+        // load from props
+        for(folderName ← originalProps.get(propName)) {
+          logger.info("{} = {}", propName, folderName)
+
+          update(EnvKeys.eventsStoreFolder, Some(new File(folderName)))
+        }
+
+    }
+  }
+
+  private[this] def checkEventsStoreFolderExistence: Unit = {
+    val propName = EnvKeys.eventsStoreFolder.name
+    for(folder ← this.envGetEx(EnvKeys.eventsStoreFolder)) {
+      val canonicalFolder = {
+        if(folder.isAbsolute) {
+          folder.getCanonicalFile
+        } else {
+          logger.info("{} is not absolute, making it relative to Aquarium Home", propName)
+          new File(ResourceLocator.Homes.Folders.AquariumHome, folder.getPath).getCanonicalFile
+        }
+      }
+
+      val canonicalPath = canonicalFolder.getCanonicalPath
+
+      if(canonicalFolder.exists() && !canonicalFolder.isDirectory) {
+        throw new AquariumInternalError("%s = %s is not a folder".format(propName, canonicalFolder))
+      }
+
+      // Now, events folder must be outside AQUARIUM_HOME, since AQUARIUM_HOME can be wiped out for an upgrade but
+      // we still want to keep the events.
+      val ahCanonicalPath = ResourceLocator.Homes.Folders.AquariumHome.getCanonicalPath
+      if(canonicalPath.startsWith(ahCanonicalPath)) {
+        throw new AquariumInternalError(
+          "%s = %s is under Aquarium Home = %s".format(
+            propName,
+            canonicalFolder,
+            ahCanonicalPath
+          ))
+      }
+
+      canonicalFolder.mkdirs()
+
+      update(EnvKeys.eventsStoreFolder, Some(canonicalFolder))
+    }
+  }
+
+  private[this] def checkEventsStoreFolderVariablesOverrides: Unit = {
+    def checkVar(envKey: BooleanKey): Unit = {
+      if(!_env.contains(envKey)) {
+        val propName = envKey.name
+        originalProps.getBoolean(propName) match {
+          case Just(propValue) ⇒
+            update(envKey, propValue)
+
+          case NoVal ⇒
+            update(envKey, false)
+
+          case Failed(e) ⇒
+            throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName))
+        }
+      }
+    }
+
+    checkVar(EnvKeys.eventsStoreSaveRCEvents)
+    checkVar(EnvKeys.eventsStoreSaveIMEvents)
+  }
+
+  private[this] def checkRestServiceOverride: Unit = {
+    checkNoPropsOverride(EnvKeys.restService) { envKey ⇒
+      val envKey    = EnvKeys.restService
+      val propName  = envKey.name
+      val propValue = propsGetEx(propName)
+
+      newInstance(envKey.keyType, propValue)
+    }
+  }
+
+  private[this] def checkNoPropsOverride[T: Manifest](envKey: TypedKey[T])(f: TypedKey[T] ⇒ T): Unit = {
+    if(_env.contains(envKey)) {
+      return
+    }
+
+    update(envKey, f(envKey))
+  }
+
+  private[this] def checkPropsOverride[T: Manifest](envKey: TypedKey[T])(f: (TypedKey[T], String) ⇒ T): Unit = {
+    if(_env.contains(envKey)) {
+      return
+    }
+
+    val propName = envKey.name
+    originalProps.get(propName) match {
+      case Just(propValue) ⇒
+        update(envKey, f(envKey, propValue))
+
+      case NoVal ⇒
+        throw new AquariumInternalError("No value for key %s in properties".format(propName))
+
+      case Failed(e) ⇒
+        throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName))
+    }
+  }
+
+  private[this] def checkOptionalPropsOverride[T: Manifest]
+      (envKey: TypedKey[Option[T]])
+      (f: (TypedKey[Option[T]], String) ⇒ Option[T]): Unit = {
+
+    if(_env.contains(envKey)) {
+      return
+    }
+
+    val propName = envKey.name
+    originalProps.get(propName) match {
+      case Just(propValue) ⇒
+        update(envKey, f(envKey, propValue))
+
+      case NoVal ⇒
+        update(envKey, None)
+
+      case Failed(e) ⇒
+        throw new AquariumInternalError(e, "While obtaining value for key %s in properties".format(propName))
+    }
+  }
+
+  def build(): Aquarium = {
+    if(this._aquarium ne null) {
+      return this._aquarium
+    }
+
+    checkPropsOverride(EnvKeys.version) { (envKey, propValue) ⇒ propValue }
+
+    checkNoPropsOverride(EnvKeys.eventBus) { _ ⇒ eventBus }
+
+    checkNoPropsOverride(EnvKeys.originalProps) { _ ⇒ originalProps }
+
+    checkNoPropsOverride(EnvKeys.defaultClassLoader) { _ ⇒  Thread.currentThread().getContextClassLoader }
+
+    checkNoPropsOverride(EnvKeys.converters) { _ ⇒ StdConverters.AllConverters }
+
+    checkStoreProviderOverride
+    checkStoreOverrides
+
+    checkEventsStoreFolderOverride
+    checkEventsStoreFolderExistence
+    checkEventsStoreFolderVariablesOverrides
+
+    checkRestServiceOverride
+
+    checkNoPropsOverride(EnvKeys.timerService) { envKey ⇒
+      newInstance(envKey.keyType, classOf[SimpleTimerService].getName)
+    }
+
+    checkNoPropsOverride(EnvKeys.algorithmCompiler) { _ ⇒ SimpleCostPolicyAlgorithmCompiler }
+
+    checkNoPropsOverride(EnvKeys.userStateComputations) { envKey ⇒
+      newInstance(envKey.keyType, classOf[UserStateComputations].getName)
+    }
+
+    checkNoPropsOverride(EnvKeys.akkaService) { envKey ⇒
+      newInstance(envKey.keyType, classOf[AkkaService].getName)
+    }
+
+    checkNoPropsOverride(EnvKeys.rabbitMQService) { envKey ⇒
+      newInstance(envKey.keyType, classOf[RabbitMQService].getName)
+    }
+
+    checkNoPropsOverride(EnvKeys.storeWatcherService) { envKey ⇒
+      newInstance(envKey.keyType, classOf[StoreWatcherService].getName)
+    }
+
+    checkPropsOverride(EnvKeys.actorProvider) { (envKey, propValue) ⇒
+      newInstance(envKey.keyType, propValue)
+    }
+
+    checkPropsOverride(EnvKeys.userStateTimestampThreshold) { (envKey, propValue) ⇒
+      propValue.toLong
+    }
+
+    checkPropsOverride(EnvKeys.restPort) { (envKey, propValue) ⇒
+      propValue.toInt
+    }
+
+    checkOptionalPropsOverride(EnvKeys.adminCookie) { (envKey, propValue) ⇒
+      Some(propValue)
+    }
+
+    this._aquarium = new Aquarium(_env)
+
+    this._aquarium.eventBus.syncPost(AquariumCreatedEvent(this._aquarium))
+
+    this._aquarium
+  }
+}
index eae67a8..bfb52e7 100644 (file)
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory
 import ch.qos.logback.classic.LoggerContext
 import ch.qos.logback.classic.joran.JoranConfigurator
 import com.ckkloverdos.maybe.Just
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
 
 /**
  * Main method for Aquarium
@@ -63,19 +64,6 @@ object Main extends LazyLoggable {
     }
   }
 
-  def doStart(): Unit = {
-    import ResourceLocator.SysEnvs
-
-    // We have AKKA builtin, so no need to mess with pre-existing installation.
-    if(SysEnvs.AKKA_HOME.value.isJust) {
-      val error = new AquariumInternalError("%s is set. Please unset and restart Aquarium".format(SysEnvs.Names.AKKA_HOME))
-      logger.error("%s is set".format(SysEnvs.Names.AKKA_HOME), error)
-      throw error
-    }
-
-    Aquarium.Instance.start()
-  }
-
   def main(args: Array[String]) = {
     configureLogging()
 
@@ -83,9 +71,11 @@ object Main extends LazyLoggable {
     logStarting("Aquarium")
     val ms0 = TimeHelpers.nowMillis()
     try {
-      doStart()
+      val aquarium = new AquariumBuilder(ResourceLocator.AquariumProperties).build()
+      aquarium.start()
+
       val ms1 = TimeHelpers.nowMillis()
-      logStarted(ms0, ms1, "Aquarium")
+      logStarted(ms0, ms1, "Aquarium [%s]", aquarium.version)
       logSeparator()
     } catch {
       case e: Throwable ⇒
index 4f1f028..4f4bb44 100644 (file)
@@ -42,6 +42,11 @@ import java.io.File
 import gr.grnet.aquarium.util.justForSure
 import gr.grnet.aquarium.util.isRunningTests
 import com.ckkloverdos.resource.{FileStreamResource, StreamResource, CompositeStreamResourceContext, ClassLoaderStreamResourceContext, FileStreamResourceContext}
+import com.ckkloverdos.props.Props
+import com.ckkloverdos.convert.Converters._
+import com.ckkloverdos.maybe.Just
+import com.ckkloverdos.maybe.Failed
+import com.ckkloverdos.convert.Converters
 
 /**
  * Locates resources.
@@ -267,6 +272,30 @@ object ResourceLocator {
     }
   }
 
+  final lazy val AquariumProperties = {
+    implicit val DefaultConverters = Converters.DefaultConverters
+    val maybeProps = Props(Resources.AquariumPropertiesResource)
+    maybeProps match {
+      case Just(props) ⇒
+        props
+
+      case NoVal ⇒
+        throw new AquariumInternalError(
+          "Could not load %s from %s".format(
+            ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
+            Resources.AquariumPropertiesResource))
+
+
+      case Failed(e) ⇒
+        throw new AquariumInternalError(
+          "Could not load %s from %s".format(
+            ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
+            Resources.AquariumPropertiesResource),
+          e)
+    }
+  }
+
+
   def getResource(what: String): Maybe[StreamResource] = {
     ResourceContexts.MasterResourceContext.getResource(what)
   }
index 5e1b41c..eb92f67 100644 (file)
@@ -45,7 +45,7 @@ import util.{Loggable, shortClassNameOf}
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>.
  */
-trait RoleableActor extends Actor with Loggable {
+trait RoleableActor extends Actor with Loggable with AquariumAwareSkeleton {
   def role: ActorRole
 
   override def toString = "%s@%s(%s)".format(shortClassNameOf(this), System.identityHashCode(this), role.role)
index deb14b8..d730281 100644 (file)
@@ -46,13 +46,12 @@ import gr.grnet.aquarium.actor.{RESTRole, RoleableActor, RouterRole}
 import RESTPaths._
 import gr.grnet.aquarium.util.date.TimeHelpers
 import org.joda.time.format.ISODateTimeFormat
-import gr.grnet.aquarium.actor.message.admin.PingAllRequest
-import gr.grnet.aquarium.actor.message.{RouterResponseMessage, GetUserStateRequest, RouterRequestMessage, ActorMessage, GetUserBalanceRequest}
+import gr.grnet.aquarium.actor.message.{RouterResponseMessage, GetUserStateRequest, RouterRequestMessage, GetUserBalanceRequest}
 import gr.grnet.aquarium.{ResourceLocator, Aquarium}
 import com.ckkloverdos.resource.StreamResource
-import com.ckkloverdos.maybe.{Failed, NoVal, Just}
+import com.ckkloverdos.maybe.Failed
 import java.net.InetAddress
-import gr.grnet.aquarium.event.model.{ExternalEventModel, EventModel}
+import gr.grnet.aquarium.event.model.ExternalEventModel
 
 /**
  * Spray-based REST service. This is the outer-world's interface to Aquarium functionality.
@@ -67,9 +66,6 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable {
   final val TEXT_PLAIN       = "text/plain"
   final val APPLICATION_JSON = "application/json"
 
-
-  private[this] def aquarium = Aquarium.Instance
-
   private def stringResponse(status: Int, stringBody: String, contentType: String): HttpResponse = {
     HttpResponse(
       status,
@@ -124,7 +120,7 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable {
   )(  f: RequestResponder ⇒ Unit): Unit = {
 
     aquarium.adminCookie match {
-      case Just(adminCookie) ⇒
+      case Some(adminCookie) ⇒
         headers.find(_.name.toLowerCase == Aquarium.HTTP.RESTAdminHeaderNameLowerCase) match {
           case Some(cookieHeader) if(cookieHeader.value == adminCookie) ⇒
             try f(responder)
@@ -143,7 +139,7 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable {
             responder.complete(stringResponse(401, "Unauthorized!", TEXT_PLAIN))
         }
 
-      case NoVal ⇒
+      case None ⇒
         responder.complete(stringResponse(403, "Forbidden!", TEXT_PLAIN))
     }
   }
@@ -250,7 +246,6 @@ class RESTActor private(_id: String) extends RoleableActor with Loggable {
 
   private[this]
   def callRouter(message: RouterRequestMessage, responder: RequestResponder): Unit = {
-    val aquarium = Aquarium.Instance
     val actorProvider = aquarium.actorProvider
     val router = actorProvider.actorForRole(RouterRole)
     val futureResponse = router ask message
index 0f20383..7ee1c7a 100644 (file)
@@ -46,12 +46,11 @@ import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.event.model.im.IMEventModel
 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf, shortNameOfClass, shortNameOfType}
-import gr.grnet.aquarium.computation.reason.{RealtimeBillingCalculation, InitialUserActorSetup, UserStateChangeReason, IMEventArrival, InitialUserStateSetup}
-import gr.grnet.aquarium.{AquariumInternalError, Aquarium}
+import gr.grnet.aquarium.computation.reason.{RealtimeBillingCalculation, InitialUserActorSetup, UserStateChangeReason, IMEventArrival}
+import gr.grnet.aquarium.AquariumInternalError
 import gr.grnet.aquarium.computation.state.parts.IMStateSnapshot
 import gr.grnet.aquarium.computation.BillingMonthInfo
 import gr.grnet.aquarium.computation.state.{UserStateBootstrap, UserState}
-import gr.grnet.aquarium.event.model.resource.ResourceEventModel
 
 /**
  *
@@ -83,8 +82,6 @@ class UserActor extends ReflectiveRoleableActor {
 
   def role = UserActorRole
 
-  private[this] def aquarium: Aquarium = Aquarium.Instance
-
   private[this] def userStateComputations = aquarium.userStateComputations
 
   private[this] def stdUserStateStoreFunc = (userState: UserState) ⇒ {
@@ -92,7 +89,7 @@ class UserActor extends ReflectiveRoleableActor {
   }
 
   private[this] def _timestampTheshold = {
-    aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(1000L * 60 * 5 /* 5 minutes */)
+    aquarium.userStateTimestampThreshold
   }
 
   private[this] def haveUserState = {
index 4c7bf28..0ee314d 100644 (file)
@@ -42,24 +42,22 @@ import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
 import gr.grnet.aquarium.computation.state.parts._
 import gr.grnet.aquarium.event.model.NewWalletEntry
 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
-import gr.grnet.aquarium.{Aquarium, AquariumInternalError}
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Aquarium, AquariumAware, AquariumInternalError}
 import gr.grnet.aquarium.computation.reason.{MonthlyBillingCalculation, InitialUserStateSetup, UserStateChangeReason}
 import gr.grnet.aquarium.computation.state.{UserStateWorker, UserStateBootstrap, UserState}
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+import com.google.common.eventbus.Subscribe
 
 /**
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
-final class UserStateComputations(_aquarium: => Aquarium) extends Loggable {
-
-  lazy val aquarium = _aquarium
-
-  lazy val storeProvider         = aquarium.storeProvider
-  lazy val timeslotComputations  = new TimeslotComputations {}
+final class UserStateComputations extends AquariumAwareSkeleton with Loggable {
+  lazy val timeslotComputations  = new TimeslotComputations {} // FIXME
   lazy val algorithmCompiler     = aquarium.algorithmCompiler
-  lazy val policyStore           = storeProvider.policyStore
-  lazy val userStateStoreForRead = storeProvider.userStateStore
-  lazy val resourceEventStore    = storeProvider.resourceEventStore
+  lazy val policyStore           = aquarium.policyStore
+  lazy val userStateStoreForRead = aquarium.userStateStore
+  lazy val resourceEventStore    = aquarium.resourceEventStore
 
   def findUserStateAtEndOfBillingMonth(
       userStateBootstrap: UserStateBootstrap,
index eed9cde..59d2344 100644 (file)
@@ -60,18 +60,9 @@ case class RoleHistoryItem(
      */
     validTo: Long = Long.MaxValue
 ) {
-
-  try {
-    require(
-      validFrom <= validTo,
-      "validFrom(%s) <= validTo(%s)".format(new MutableDateCalc(validFrom), new MutableDateCalc(validTo)))
-  }
-  catch {
-    case e: IllegalArgumentException ⇒
-      // TODO Remove this
-      Aquarium.Instance.debug(this, "!! validFrom = %s, validTo = %s, dx=%s", validFrom, validTo, validTo-validFrom)
-      throw e
-  }
+  require(
+    validFrom <= validTo,
+    "validFrom(%s) <= validTo(%s)".format(new MutableDateCalc(validFrom), new MutableDateCalc(validTo)))
 
   require(name ne null, "Name is not null")
 
index 76eb837..80c1413 100644 (file)
@@ -54,22 +54,24 @@ import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.{RabbitMQConKeys,
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
-
-                       /**
-                        * Specifies what we do with the message payload.
-                        */
-                       handler: PayloadHandler,
-
-                       /**
-                        * Specifies how we execute the handler
-                        */
-                       executor: PayloadHandlerExecutor,
-
-                       /**
-                        * After the payload is processed, we call this function with ourselves and the result.
-                        */
-                       notifier: (RabbitMQConsumer, Maybe[HandlerResult]) ⇒ Unit
+class RabbitMQConsumer(
+    val aquarium: Aquarium,
+    val conf: RabbitMQConsumerConf,
+
+    /**
+     * Specifies what we do with the message payload.
+     */
+    handler: PayloadHandler,
+
+    /**
+     * Specifies how we execute the handler
+     */
+    executor: PayloadHandlerExecutor,
+
+    /**
+     * After the payload is processed, we call this function with ourselves and the result.
+     */
+    notifier: (RabbitMQConsumer, Maybe[HandlerResult]) ⇒ Unit
 ) extends Loggable with Lifecycle { consumerSelf ⇒
 
   private[this] var _factory: ConnectionFactory = _
@@ -126,7 +128,7 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
   case object LifecycleStartReason extends StartReason
   case object PingStartReason extends StartReason
 
-  private[this] def timerService = Aquarium.Instance.timerService
+  private[this] def timerService = aquarium.timerService
 
   private[this] lazy val servers = {
     conf.connectionConf(RabbitMQConKeys.servers)
@@ -261,8 +263,6 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
     safeStop()
   }
 
-  private[this] def aquarium = Aquarium.Instance
-
   private[this] def postBusError(event: BusEvent): Unit = {
     aquarium.eventBus ! event
   }
index be6f4c6..deee38f 100644 (file)
 package gr.grnet.aquarium.logic.accounting
 
 import dsl.{Timeslot, DSLPolicy, DSL}
-import gr.grnet.aquarium.Aquarium._
 import java.io.{InputStream, FileInputStream, File}
 import java.util.Date
 import gr.grnet.aquarium.util.Loggable
 import java.util.concurrent.atomic.AtomicReference
-import gr.grnet.aquarium.Aquarium.Keys
-import com.ckkloverdos.maybe.{Failed, NoVal, Just}
 import collection.immutable.{TreeMap, SortedMap}
-import gr.grnet.aquarium.{AquariumException, Aquarium}
+import gr.grnet.aquarium.{ResourceLocator, AquariumAwareSkeleton, AquariumException, Aquarium}
 import gr.grnet.aquarium.util.date.{MutableDateCalc, TimeHelpers}
+import com.ckkloverdos.maybe.{Failed, Just}
 
 /**
  * Searches for and loads the applicable accounting policy
  *
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
-object Policy extends DSL with Loggable {
+object Policy extends DSL with Loggable with AquariumAwareSkeleton {
 
   /* Pointer to the latest policy */
   private[logic] lazy val policies = {
@@ -163,7 +161,7 @@ object Policy extends DSL with Loggable {
    */
   private[logic] def reloadPolicies: SortedMap[Timeslot, DSLPolicy] =
     if (config == null)
-      reloadPolicies(Instance)
+      reloadPolicies(aquarium)
     else
       reloadPolicies(config)
 
@@ -174,24 +172,24 @@ object Policy extends DSL with Loggable {
 
     //2. Check whether policy file has been updated
     val latestPolicyChange = if (pol.isEmpty) 0 else pol.last.validFrom
-    val policyf = Instance.findConfigFile(PolicyConfName, Keys.aquarium_policy, PolicyConfName)
+
+    val policyf = ResourceLocator.Resources.PolicyYAMLResource
     var updated = false
 
     if (policyf.exists) {
-      if (policyf.lastModified > latestPolicyChange) {
-        logger.info("Policy file updated since last check, reloading")
         updated = true
-      } else {
-        logger.info("Policy file not changed since last check")
-      }
-    } else {
+     } else {
       logger.warn("User specified policy file %s does not exist, " +
-        "using stored policy information".format(policyf.getAbsolutePath))
+        "using stored policy information".format(policyf.url))
     }
 
     if (updated) {
       val ts = TimeHelpers.nowMillis()
-      val parsedNew = loadPolicyFromFile(policyf)
+      val parsedNewM = policyf.mapInputStream(parse).toMaybeEither
+      val parsedNew = parsedNewM match {
+        case Just(parsedNew) ⇒ parsedNew
+        case Failed(e)       ⇒ throw e
+      }
       val newPolicy = parsedNew.toPolicyEntry
 
       config.policyStore.findPolicyEntry(newPolicy.id) match {
diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/RoleAgreements.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/RoleAgreements.scala
deleted file mode 100644 (file)
index ea26311..0000000
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.logic.accounting
-
-import dsl.DSLAgreement
-import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.Aquarium
-import gr.grnet.aquarium.Aquarium.{Instance, Keys}
-import io.Source
-import java.io.{InputStream, File}
-import java.util.regex.Pattern
-
-/**
- * Encapsulates mappings from user roles to Aquarium policies. The mappings
- * are used during new user registration to automatically set a policy to
- * a user according to its role.
- *
- * The configuration is read from a configuration file pointed to by the
- * main Aquarium configuration file. The
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-object RoleAgreements extends Loggable {
-
-  private var mappings: Map[String, DSLAgreement] = loadMappings
-
-  /**
-   * Returns the agreement that matches the provided role. The search for a
-   * matching agreement is done with the current version of the Aquarium
-   * policy.
-   */
-  def agreementForRole(role : String) = mappings.get(role.toLowerCase) match {
-    case Some(x) => x
-    case None => mappings.get("*").getOrElse(
-      throw new RuntimeException("Cannot find agreement for default role *"))
-  }
-
-  /**
-   * Trigger reloading of the mappings file.
-   */
-  def reloadMappings = mappings = loadMappings
-
-  /**
-   * Load and parse the mappings file
-   */
-  private[logic] def loadMappings = synchronized {
-    val config = Aquarium.Instance.get(Keys.aquarium_role_agreement_map)
-    val configFile = Aquarium.Instance.findConfigFile(
-      Aquarium.RolesAgreementsName, Keys.aquarium_role_agreement_map,
-      Aquarium.RolesAgreementsName)
-
-    def loadFromClasspath: Source = {
-      getClass.getClassLoader.getResourceAsStream(Aquarium.RolesAgreementsName) match {
-        case x: InputStream =>
-          logger.warn("Using default role to agreement mappings, this is " +
-            "problably not what you want")
-          Source.fromInputStream(x)
-        case null =>
-          logger.error("No valid role to agreement mappings configuration found, " +
-            "Aquarium will fail")
-          null
-      }
-    }
-
-    val source = if (configFile.exists && configFile.isFile) {
-        if (configFile.isFile)
-          Source.fromFile(configFile)
-        else {
-          logger.warn(("Configured file %s is a directory. " +
-            "Trying the default one.").format(config))
-          loadFromClasspath
-        }
-    } else {
-        logger.warn("Configured file %s for role-agreement mappings cannot " +
-          "be found. Trying the default one.".format(config))
-        loadFromClasspath
-    }
-
-    parseMappings(source)
-  }
-
-  def parseMappings(src: Source) = {
-    val p = Pattern.compile("^\\s*([\\*a-zA-Z0-9-_]+)\\s*=\\s*([a-zA-Z0-9-_]+).*$")
-
-    val mappings = src.getLines.foldLeft(Map[String, DSLAgreement]()) {
-      (acc, l) =>
-        l match {
-          case x if (x.matches("^\\s*$")) => acc
-          case x if (x.matches("^\\s*\\#")) => acc
-          case x if (p.matcher(x).find()) =>
-            // Ugly code warning
-            val m = p.matcher(x)
-            m.find()
-            val role = m.group(1)
-            val agrName = m.group(2)
-            Policy.policy.findAgreement(agrName) match {
-              case Some(x) => acc ++ Map(role -> x)
-              case None =>
-                logger.warn("No agreement with name %s".format(agrName))
-                acc
-            }
-          case _ => acc
-        }
-    }
-    if (!mappings.keysIterator.contains("*"))
-      throw new RuntimeException("Cannot find agreement for default role *")
-    mappings
-  }
-}
index e6eafce..a7b0f42 100644 (file)
@@ -37,7 +37,8 @@ package gr.grnet.aquarium.service
 
 import akka.actor.Actor
 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
-import gr.grnet.aquarium.util.date.TimeHelpers
+import gr.grnet.aquarium.ResourceLocator.SysEnvs
+import gr.grnet.aquarium.AquariumInternalError
 
 /**
  * A wrapper around Akka, so that it is uniformly treated as an Aquarium service.
@@ -47,6 +48,12 @@ import gr.grnet.aquarium.util.date.TimeHelpers
 
 final class AkkaService extends Lifecycle with Loggable {
   def start() = {
+    // We have AKKA builtin, so no need to mess with pre-existing installation.
+    if(SysEnvs.AKKA_HOME.value.isJust) {
+      val error = new AquariumInternalError("%s is set. Please unset and restart Aquarium".format(SysEnvs.Names.AKKA_HOME))
+      logger.error("%s is set".format(SysEnvs.Names.AKKA_HOME), error)
+      throw error
+    }
   }
 
   def stop()= {
index 1753cce..54f7d03 100644 (file)
@@ -38,7 +38,7 @@ package gr.grnet.aquarium.service
 import gr.grnet.aquarium.Configurable
 import com.ckkloverdos.props.Props
 import gr.grnet.aquarium.service.event.BusEvent
-import com.google.common.eventbus.{AsyncEventBus, DeadEvent, Subscribe}
+import com.google.common.eventbus.{EventBus, AsyncEventBus, DeadEvent, Subscribe}
 import gr.grnet.aquarium.util.{DaemonThreadFactory, Lifecycle, Loggable}
 import java.util.concurrent.{ConcurrentHashMap, Executors}
 import java.util.Collections
@@ -51,11 +51,14 @@ import java.util.Collections
  */
 
 class EventBusService extends Loggable with Lifecycle with Configurable {
+  private[this] val className = classOf[EventBusService].getName
   private[this] val asyncBus = new AsyncEventBus(
-    classOf[EventBusService].getName,
+    "%s/async".format(className),
     Executors.newFixedThreadPool(1, new DaemonThreadFactory)
   )
 
+  private[this] val syncBus = new EventBus("%s/sync")
+
   private[this] val subscribers = Collections.newSetFromMap[AnyRef](new ConcurrentHashMap())
 
   def propertyPrefix = None
@@ -75,16 +78,23 @@ class EventBusService extends Loggable with Lifecycle with Configurable {
   def stop() = synchronized {
     val iterator = subscribers.iterator()
     while(iterator.hasNext) {
-      asyncBus.unregister(iterator.next())
+      val subscriber = iterator.next()
+      asyncBus.unregister(subscriber)
+      syncBus.unregister(subscriber)
     }
     subscribers.clear()
   }
 
-  @inline
-  def post[A <: BusEvent](event: A): Unit = {
-    this ! event
+  /**
+   * Posts an event synchronously.
+   */
+  def syncPost[A <: BusEvent](event: A): Unit = {
+    syncBus.post(event)
   }
 
+  /**
+   * Post an event asynchronously.
+   */
   def ![A <: BusEvent](event: A): Unit = {
     asyncBus.post(event)
   }
@@ -92,11 +102,13 @@ class EventBusService extends Loggable with Lifecycle with Configurable {
   def removeSubscriber[A <: AnyRef](subscriber: A): Unit = synchronized {
     subscribers.remove(subscriber)
     asyncBus.unregister(subscriber)
+    syncBus.register(subscriber)
   }
 
   def addSubscriber[A <: AnyRef](subscriber: A): Unit = synchronized {
     subscribers.add(subscriber)
     asyncBus.register(subscriber)
+    syncBus.register(subscriber)
   }
 
   @Subscribe
index 40aefd6..09ce11f 100644 (file)
@@ -39,26 +39,35 @@ import gr.grnet.aquarium.actor.RESTRole
 import _root_.akka.actor._
 import cc.spray.can.{ServerConfig, HttpClient, HttpServer}
 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
-import gr.grnet.aquarium.{AquariumInternalError, Aquarium}
+import gr.grnet.aquarium.{Configurable, AquariumAwareSkeleton, Aquarium}
+import com.ckkloverdos.props.Props
 
 /**
  * REST service based on Actors and Spray.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>.
  */
-class RESTActorService extends Lifecycle with Loggable {
+class RESTActorService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
   private[this] var _port: Int = 8080
   private[this] var _restActor: ActorRef = _
   private[this] var _serverActor: ActorRef = _
   private[this] var _clientActor: ActorRef = _
 
-  def start(): Unit = {
-    val aquarium = Aquarium.Instance
-    this._port = aquarium.props.getInt(Aquarium.Keys.rest_port).getOr(
-      throw new AquariumInternalError(
-        "%s was not specified in Aquarium properties".format(Aquarium.Keys.rest_port)))
 
-    logger.debug("Starting on port %s".format(this._port))
+  def propertyPrefix = Some(RESTActorService.Prefix)
+
+  /**
+   * Configure this instance with the provided properties.
+   *
+   * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
+   */
+  def configure(props: Props) {
+    this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
+    logger.debug("HTTP port is %s".format(this._port))
+  }
+
+  def start(): Unit = {
+    logger.info("Starting HTTP on port %s".format(this._port))
 
     this._restActor = aquarium.actorProvider.actorForRole(RESTRole)
     // Start Spray subsystem
@@ -67,7 +76,13 @@ class RESTActorService extends Lifecycle with Loggable {
   }
 
   def stop(): Unit = {
+    logger.info("Stopping HTTP on port %s".format(this._port))
+
     this._serverActor.stop()
     this._clientActor.stop()
   }
+}
+
+object RESTActorService {
+  final val Prefix = "rest"
 }
\ No newline at end of file
index c1e42c3..87992ce 100644 (file)
@@ -37,13 +37,13 @@ package gr.grnet.aquarium.service
 
 import com.ckkloverdos.props.Props
 import com.google.common.eventbus.Subscribe
-import gr.grnet.aquarium.{Aquarium, Configurable}
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Aquarium, Configurable}
 import gr.grnet.aquarium.converter.StdConverters
 import gr.grnet.aquarium.actor.RouterRole
 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
 import gr.grnet.aquarium.util.sameTags
-import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent}
+import gr.grnet.aquarium.service.event.{AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
 import gr.grnet.aquarium.connector.rabbitmq.service.{PayloadHandlerFutureExecutor, PayloadHandlerPostNotifier}
 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys
 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys
@@ -54,14 +54,12 @@ import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, R
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-class RabbitMQService extends Loggable with Lifecycle with Configurable {
+class RabbitMQService extends Loggable with Lifecycle with Configurable with AquariumAwareSkeleton {
   @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
   @volatile private[this] var _consumers = List[RabbitMQConsumer]()
 
   def propertyPrefix = Some(RabbitMQKeys.PropertiesPrefix)
 
-  def aquarium = Aquarium.Instance
-
   def eventBus = aquarium.eventBus
 
   def resourceEventStore = aquarium.resourceEventStore
@@ -79,11 +77,18 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable {
    */
   def configure(props: Props) = {
     this._props = props
+  }
+
+  @Subscribe
+  override def awareOfAquariumEx(event: AquariumCreatedEvent) {
+    super.awareOfAquariumEx(event)
 
-    doConfigure()
+    aquarium.eventBus.addSubscriber(this)
+
+    doSetup()
   }
 
-  private[this] def doConfigure(): Unit = {
+  private[this] def doSetup(): Unit = {
     val postNotifier = new PayloadHandlerPostNotifier(logger)
 
     val rcHandler = new ResourceEventPayloadHandler(aquarium, logger)
@@ -133,6 +138,7 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable {
         rccc.queueName
       ))
       new RabbitMQConsumer(
+        aquarium,
         rccc,
         rcHandler,
         futureExecutor,
@@ -148,6 +154,7 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable {
         imcc.queueName
       ))
       new RabbitMQConsumer(
+        aquarium,
         imcc,
         imHandler,
         futureExecutor,
@@ -164,8 +171,6 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable {
   }
 
   def start() = {
-    aquarium.eventBus.addSubscriber(this)
-
     safeStart()
   }
 
index e092965..5d1556a 100644 (file)
@@ -39,6 +39,7 @@ import akka.actor.ActorRef
 import com.ckkloverdos.props.Props
 import gr.grnet.aquarium.util.Lifecycle
 import gr.grnet.aquarium.actor.ActorRole
+import gr.grnet.aquarium.AquariumAware
 
 /**
  *
index 4bf7440..6b1f6b8 100644 (file)
@@ -37,22 +37,25 @@ package gr.grnet.aquarium.service
 
 import com.ckkloverdos.props.Props
 import akka.actor.ActorRef
-import gr.grnet.aquarium.Configurable
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable}
 import java.util.concurrent.ConcurrentHashMap
 import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.actor.message.config.{AquariumPropertiesLoaded, ActorProviderConfigured}
 import gr.grnet.aquarium.actor._
 
-
 /**
  * All actors are provided locally.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>.
  */
-class SimpleLocalRoleableActorProviderService extends RoleableActorProviderService with Configurable with Loggable {
+class SimpleLocalRoleableActorProviderService
+  extends RoleableActorProviderService
+     with AquariumAwareSkeleton
+     with Configurable
+     with Loggable {
+
   private[this] val actorCache = new ConcurrentHashMap[ActorRole, ActorRef]
-  private[this] var _props: Props = _
+  @volatile private[this] var _props: Props = _
 
   def propertyPrefix = None
 
@@ -73,7 +76,11 @@ class SimpleLocalRoleableActorProviderService extends RoleableActorProviderServi
   }
 
   private[this] def _newActor(role: ActorRole): ActorRef = {
-    val actorRef = akka.actor.Actor.actorOf(role.actorType).start()
+    val actorFactory = (_class: Class[_ <: RoleableActor]) ⇒ {
+      aquarium.newInstance(_class, _class.getName)
+    }
+
+    val actorRef = akka.actor.Actor.actorOf(actorFactory(role.actorType)).start()
 
     val propsMsg = AquariumPropertiesLoaded(this._props)
     if(role.canHandleConfigurationMessage(propsMsg)) {
index d689795..271a409 100644 (file)
@@ -38,7 +38,7 @@ package gr.grnet.aquarium.service
 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle, Tag}
 import java.util.concurrent.atomic.AtomicBoolean
 import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent}
-import gr.grnet.aquarium.{Configurable, Aquarium}
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, Aquarium}
 import com.ckkloverdos.props.Props
 import gr.grnet.aquarium.store.StoreProvider
 
@@ -48,7 +48,7 @@ import gr.grnet.aquarium.store.StoreProvider
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-final class StoreWatcherService extends Lifecycle with Configurable with Loggable {
+final class StoreWatcherService extends Lifecycle with Configurable with AquariumAwareSkeleton with Loggable {
   private[this] var _reconnectPeriodMillis = 1000L
 
   private[this] val _pingIsScheduled = new AtomicBoolean(false)
@@ -56,9 +56,6 @@ final class StoreWatcherService extends Lifecycle with Configurable with Loggabl
   private[this] val _rcIsAlive = new AtomicBoolean(true)
   private[this] val _imIsAlive = new AtomicBoolean(true)
 
-  def aquarium = Aquarium.Instance
-
-
   def propertyPrefix = Some(StoreProvider.Prefix)
 
   /**
@@ -70,10 +67,13 @@ final class StoreWatcherService extends Lifecycle with Configurable with Loggabl
     this._reconnectPeriodMillis = props.getLongEx(StoreProvider.Keys.reconnect_period_millis)
   }
 
-  private[this] def safePingStore(tag: Tag,
-                                  pinger: () ⇒ Any,
-                                  getStatus: () ⇒ Boolean,
-                                  setStatus: (Boolean) ⇒ Any): Unit = {
+  private[this] def safePingStore(
+      tag: Tag,
+      pinger: () ⇒ Any,
+      getStatus: () ⇒ Boolean,
+      setStatus: (Boolean) ⇒ Any
+  ): Unit = {
+
     try {
       val wasAlive = getStatus()
       pinger()
@@ -92,11 +92,14 @@ final class StoreWatcherService extends Lifecycle with Configurable with Loggabl
     }
   }
 
-  private[this] def doSchedulePing(tag: Tag,
-                                   info: String,
-                                   pinger: () ⇒ Any,
-                                   getStatus: () ⇒ Boolean,
-                                   setStatus: (Boolean) ⇒ Any): Unit = {
+  private[this] def doSchedulePing(
+      tag: Tag,
+      info: String,
+      pinger: () ⇒ Any,
+      getStatus: () ⇒ Boolean,
+      setStatus: (Boolean) ⇒ Any
+  ): Unit = {
+
     aquarium.timerService.scheduleOnce(
       info,
       {
diff --git a/src/main/scala/gr/grnet/aquarium/service/event/AquariumCreatedEvent.scala b/src/main/scala/gr/grnet/aquarium/service/event/AquariumCreatedEvent.scala
new file mode 100644 (file)
index 0000000..fe0fc39
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.service.event
+
+import gr.grnet.aquarium.Aquarium
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+final case class AquariumCreatedEvent(aquarium: Aquarium) extends BusEvent
index 99f6d2f..72f851f 100644 (file)
@@ -50,7 +50,7 @@ trait ResourceEventStore {
   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent
 
   def clearResourceEvents(): Unit = {
-    // This method is implemented only in MemStore.
+    // This method is implemented only in MemStoreProvider.
     throw new AquariumInternalError("Unsupported operation")
   }
 
@@ -59,7 +59,7 @@ import gr.grnet.aquarium.computation.BillingMonthInfo
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
 
-class MemStore extends UserStateStore
+class MemStoreProvider extends UserStateStore
   with Configurable with PolicyStore
   with ResourceEventStore with IMEventStore
   with StoreProvider {
@@ -87,7 +87,7 @@ class MemStore extends UserStateStore
       "PolicyEntry"         -> _policyEntries.size
     )
 
-    "MemStore(%s)" format map
+    "MemStoreProvider(%s)" format map
   }
 
   //+ StoreProvider
@@ -278,7 +278,7 @@ class MemStore extends UserStateStore
   }
 }
 
-object MemStore {
+object MemStoreProvider {
   final def isLocalIMEvent(event: IMEventModel) = event match {
     case _: MemIMEvent ⇒ true
     case _ ⇒ false
index c1f730a..def2921 100644 (file)
@@ -1,13 +1,4 @@
-version = 0.0.2-SNAPSHOT
-
-# Location of the Aquarium accounting policy config file. If commented
-# out, Aquarium will look for the file policy.yaml first at the program
-# starting directory and then fall back to the classpath.
-aquarium.policy=policy.yaml
-
-# Location of the file that defines the mappings between
-# user roles and agreements
-aquarium.role-agreement.map=role-agreement.map
+version = 0.2.0-SNAPSHOT
 
 ### Queue related settings
 
@@ -18,8 +9,7 @@ rabbitmq.reconnect.period.millis=1000
 # active-active mode.
 rabbitmq.servers=localhost
 
-# Comma separated list of rabbitmq servers to use. The servers must be in an
-# active-active mode.
+# Port for connecting to the AMQP server
 rabbitmq.port=5672
 
 # User name for connecting with the AMQP server
@@ -28,6 +18,9 @@ rabbitmq.username=guest
 # Passwd for connecting with the AMQP server
 rabbitmq.passwd=guest
 
+# Exchnage used by Aquarium to publish messages
+rabbitmq.exchange=aquarium
+
 # Virtual host on the AMQP server
 rabbitmq.vhost=/
 
@@ -65,13 +58,14 @@ mongodb.dbschema=aquarium
 mongodb.connection.pool.size=20
 
 # Relative to AQUARIUM_HOME or an absolute path
-events.store.folder=../events-store
+# DO NOT set this in production
+#events.store.folder=../events-store
 
 # Store resource events to events.store.folder as well
-events.store.save.rc.events=true
+events.store.save.rc.events=false
 
 # Store IM events to events.store.folder as well
-events.store.save.im.events=true
+events.store.save.im.events=false
 
 # How often do we attempt a reconnection to the store(s)?
 anystore.reconnect.period.millis=1000
@@ -82,43 +76,17 @@ anystore.reconnect.period.millis=1000
 
 # Actor subsystem
 actor.provider.class=gr.grnet.aquarium.service.SimpleLocalRoleableActorProviderService
-
 # Class that initializes the REST service
 rest.service.class=gr.grnet.aquarium.service.RESTActorService
-
 # Store subsystem
-store.provider.class=gr.grnet.aquarium.store.mongodb.MemStore
-
+store.provider.class=gr.grnet.aquarium.store.mongodb.MongoDBStoreProvider
 # Override the user store (if present, it will not be given by the store provider above)
-user.state.store.class=gr.grnet.aquarium.store.memory.MemStore
-
-# Override the event store (if present, it will not be given by the store provider above)
-resource.event.store.class=gr.grnet.aquarium.store.memory.MemStore
-
+#user.state.store.class=gr.grnet.aquarium.store.memory.MemStorede the event store (if present, it will not be given by the store provider above)
+#resource.event.store.class=
 # Override the user event store (if present, it will not be given by the store provider above)
-user.event.store.class=gr.grnet.aquarium.store.memory.MemStore
-
+#user.event.store.class=
 # Override the user event store (if present, it will not be given by the store provider above)
-policy.store.class=gr.grnet.aquarium.store.memory.MemStore
-
-# The lower mark for the UserActors' LRU.
-user.actor.LRU.lower.mark=800
-
-# The upper mark for the UserActors' LRU.
-user.actors.LRU.upper.mark=1000
-
-# A time period in milliseconds for which we can tolerate stale data regarding user state.
-user.state.timestamp.threshold=10000
-
-# Comma separated list of exchanges known to aquarium
-rabbitmq.exchange=aquarium
-
-# This is an absolute constant for the lifetime of an Aquarium installation.
-# 1 means that every second counts
-time.unit.in.seconds = 1
-
-# Save unparsed user events to user event store
-ack.unparsed.event.im=false
+#policy.store.class=
 
 # Administrative REST API authorization cookie
 admin.cookie=1
\ No newline at end of file
diff --git a/src/test/scala/gr/grnet/aquarium/logic/test/PolicyTest.scala b/src/test/scala/gr/grnet/aquarium/logic/test/PolicyTest.scala
deleted file mode 100644 (file)
index 533b376..0000000
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.logic.test
-
-import org.junit.Test
-import org.junit.Assert._
-import gr.grnet.aquarium.{StoreConfigurator}
-import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.logic.accounting.Policy
-import java.io.File
-
-/**
- * Tests for the Policy resolution algorithms
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-class PolicyTest extends DSLTestBase with StoreConfigurator {
-
-  @Test
-  def testReloadPolicies: Unit = {
-    
-    def copyModifyFile(from: String, to: String) = {
-      val extra = "    - agreement:\n      overrides: default\n      name: foobar"
-      val out = new java.io.BufferedWriter(new java.io.FileWriter(to) );
-      io.Source.fromFile(from).getLines.map(x => x + "\n").foreach(s => out.write(s,0,s.length));
-      out.write(extra)
-      out.close()
-    }
-
-    //Initial policy file, read from class path
-    Policy.withConfigurator(configurator)
-    val pol = Policy.policies.get
-
-    /*val f = Policy.policyFile
-    assertTrue(f.exists)
-
-    //Touch the file to trigger reloading with non changed state
-    Thread.sleep(200)
-    f.setLastModified(TimeHelpers.nowMillis)
-    var polNew = Policy.reloadPolicies
-
-    assertEquals(pol.keys.size, polNew.keys.size)
-    //assertEquals(pol.keys.head, polNew.keys.head)
-
-    //Copy the file and add a new element -> new policy
-    val fileCopy = new File(f.getParent, "policy.yaml.old")
-    f.renameTo(fileCopy)
-    copyModifyFile(fileCopy.getAbsolutePath,
-      (new File(fileCopy.getParent, "policy.yaml")).getAbsolutePath)
-
-    polNew = Policy.reloadPolicies
-    assertEquals(pol.keys.size + 1, polNew.keys.size)
-    val policyEffectivities = Policy.policies.get.keySet.toList.sortWith((x,y) => if (y.from after x.from) true else false)
-    testSuccessiveTimeslots(policyEffectivities)
-    testNoGaps(policyEffectivities)
-    */
-  }
-
-  @Test
-  def testLoadStore: Unit = {
-    before
-
-    val policies = configurator.policyStore
-    policies.storePolicyEntry(this.dsl.toPolicyEntry)
-
-    val copy1 = this.dsl.copy(algorithms = List())
-    policies.storePolicyEntry(copy1.toPolicyEntry)
-
-    val copy2 = this.dsl.copy(pricelists = List())
-    policies.storePolicyEntry(copy2.toPolicyEntry)
-
-    var pol = policies.loadPolicyEntriesAfter(TimeHelpers.nowMillis())
-    assert(pol.isEmpty)
-
-    pol = policies.loadPolicyEntriesAfter(0)
-    assertEquals(3, pol.size)
-    assertEquals(pol.head.policyYAML, this.dsl.toYAML)
-    assertEquals(pol.tail.head.policyYAML, copy1.toYAML)
-    assertEquals(pol.tail.tail.head.policyYAML, copy2.toYAML)
-  }
-}
index e70373a..68b4732 100644 (file)
@@ -46,7 +46,7 @@ import cc.spray.can.{HttpResponse, HttpHeader, HttpRequest}
 import gr.grnet.aquarium.util.makeString
 import gr.grnet.aquarium.converter.StdConverters
 import net.liftweb.json.JsonAST.{JValue, JInt}
-import gr.grnet.aquarium.{AquariumException, LogicTestsAssumptions, Aquarium}
+import gr.grnet.aquarium.{ResourceLocator, AquariumBuilder, AquariumException, LogicTestsAssumptions, Aquarium}
 
 /**
  * 
@@ -58,10 +58,10 @@ class RESTActorTest {
     assumeTrue(LogicTestsAssumptions.EnableSprayTests)
     
     // Initialize configuration subsystem
-    val aquarium = Aquarium.Instance
+    val aquarium = new AquariumBuilder(ResourceLocator.AquariumProperties).build()
     aquarium.start()
-    val port = aquarium.props.getInt(Aquarium.Keys.rest_port).getOr(
-      throw new AquariumException("No %s specified in aquarium properties".format(Aquarium.Keys.rest_port)))
+
+    val port = aquarium.restPort
     val dialog = SprayHttpDialog("localhost", port)
 
     val pingReq = HttpRequest(method = GET, uri = "/ping", headers = HttpHeader("Content-Type", "text/plain; charset=UTF-8")::Nil)
@@ -84,6 +84,6 @@ class RESTActorTest {
       }
     }
 
-    aquarium.stopWithDelay(1000)
+    aquarium.stopAfterMillis(1000)
   }
 }
\ No newline at end of file
index 7d9c28c..098783e 100644 (file)
@@ -35,7 +35,7 @@
 
 package gr.grnet.aquarium.user
 
-import gr.grnet.aquarium.store.memory.MemStore
+import gr.grnet.aquarium.store.memory.MemStoreProvider
 import gr.grnet.aquarium.logic.accounting.dsl._
 import gr.grnet.aquarium.logic.accounting.Policy
 import gr.grnet.aquarium.util.{Loggable, ContextualLogger}
@@ -43,8 +43,7 @@ import gr.grnet.aquarium.simulation._
 import gr.grnet.aquarium.uid.{UIDGenerator, ConcurrentVMLocalUIDGenerator}
 import org.junit.{Assert, Ignore, Test}
 import gr.grnet.aquarium.logic.accounting.algorithm.{ExecutableCostPolicyAlgorithm, CostPolicyAlgorithmCompiler}
-import gr.grnet.aquarium.AquariumException
-import gr.grnet.aquarium.Aquarium.{Instance ⇒ AquariumInstance}
+import gr.grnet.aquarium.{Aquarium, ResourceLocator, AquariumBuilder, AquariumException}
 import gr.grnet.aquarium.computation.reason.{NoSpecificChangeReason, MonthlyBillingCalculation}
 import gr.grnet.aquarium.util.date.MutableDateCalc
 import gr.grnet.aquarium.computation.BillingMonthInfo
@@ -128,10 +127,13 @@ aquariumpolicy:
     DiskspacePriceUnit
   )
 
-  val aquarium = AquariumInstance.withStoreProviderClass(classOf[MemStore])
-  Policy.withConfigurator(aquarium)
-  val StoreProvider = aquarium.storeProvider
-  val ResourceEventStore = StoreProvider.resourceEventStore
+  val aquarium = new AquariumBuilder(ResourceLocator.AquariumProperties).
+    update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
+    build()
+
+  Policy.withConfigurator(aquarium) // FIXME
+
+  val ResourceEventStore = aquarium.resourceEventStore
 
   val Computations = aquarium.userStateComputations
 
@@ -236,12 +238,13 @@ aquariumpolicy:
   val policyOccurredMillis  = policyDateCalc.toMillis
   val policyValidFromMillis = policyDateCalc.copy.goPreviousYear.toMillis
   val policyValidToMillis   = policyDateCalc.copy.goNextYear.toMillis
-  StoreProvider.policyStore.storePolicyEntry(DefaultPolicy.toPolicyEntry(policyOccurredMillis, policyValidFromMillis, policyValidToMillis))
+  aquarium.policyStore.storePolicyEntry(DefaultPolicy.toPolicyEntry(policyOccurredMillis, policyValidFromMillis,
+    policyValidToMillis))
 
-  val Aquarium = AquariumSim(List(VMTimeResourceSim, DiskspaceResourceSim, BandwidthResourceSim), StoreProvider.resourceEventStore)
-  val DefaultResourcesMap = Aquarium.resourcesMap
+  val AquariumSim_ = AquariumSim(List(VMTimeResourceSim, DiskspaceResourceSim, BandwidthResourceSim), aquarium.resourceEventStore)
+  val DefaultResourcesMap = AquariumSim_.resourcesMap
 
-  val UserCKKL  = Aquarium.newUser("CKKL", UserCreationDate)
+  val UserCKKL  = AquariumSim_.newUser("CKKL", UserCreationDate)
 
 //  val InitialUserState = UserState.createInitialUserState(
 //    userID = UserCKKL.userID,