WIP: Remodeling events
[aquarium] / src / main / scala / gr / grnet / aquarium / Configurator.scala
index 7c556a3..8811a9f 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2011 GRNET S.A. All rights reserved.
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or
  * without modification, are permitted provided that the following
 
 package gr.grnet.aquarium
 
-import actor.{ActorProvider}
-import com.ckkloverdos.resource._
-import com.ckkloverdos.sys.SysProp
+import events.im.IMEventModel
+import java.io.File
+
+import com.ckkloverdos.maybe._
 import com.ckkloverdos.props.Props
-import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal}
 import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
-import processor.actor.{UserEventProcessorService, ResourceEventProcessorService, EventProcessorService}
-import store._
-import util.{Lifecycle, Loggable}
+
+import gr.grnet.aquarium.service._
+import gr.grnet.aquarium.util.{Lifecycle, Loggable}
+import gr.grnet.aquarium.store._
 
 /**
  * The master configurator. Responsible to load all of application configuration and provide the relevant services.
@@ -64,28 +65,32 @@ class Configurator(val props: Props) extends Loggable {
           Maybe(configurable configure props) match {
             case Just(_) ⇒
               instance
-            case Failed(e, _) ⇒
-              throw new Exception("Could not configure instance of %s".format(className), e)
+            case Failed(e) ⇒
+              throw new AquariumException("Could not configure instance of %s".format(className), e)
             case NoVal ⇒
-              throw new Exception("Could not configure instance of %s".format(className))
+              throw new AquariumException("Could not configure instance of %s".format(className))
           }
         case _ ⇒
           instance
       }
-      case Failed(e, _) ⇒
-        throw new Exception("Could not instantiate %s".format(className), e)
+      case Failed(e) ⇒
+        throw new AquariumException("Could not instantiate %s".format(className), e)
       case NoVal ⇒
-        throw new Exception("Could not instantiate %s".format(className))
+        throw new AquariumException("Could not instantiate %s".format(className))
     }
 
   }
 
-  private[this] lazy val _actorProvider: ActorProvider = {
-    val instance = newInstance[ActorProvider](props.getEx(Keys.actor_provider_class))
-    logger.info("Loaded ActorProvider: %s".format(instance.getClass))
+  private[this] lazy val _actorProvider: ActorProviderService = {
+    val instance = newInstance[ActorProviderService](props.getEx(Keys.actor_provider_class))
+    logger.info("Loaded ActorProviderService: %s".format(instance.getClass))
     instance
   }
 
+  /**
+   * Initializes a store provider, according to the value configured
+   * in the configuration file. The
+   */
   private[this] lazy val _storeProvider: StoreProvider = {
     val instance = newInstance[StoreProvider](props.getEx(Keys.store_provider_class))
     logger.info("Loaded StoreProvider: %s".format(instance.getClass))
@@ -120,10 +125,10 @@ class Configurator(val props: Props) extends Loggable {
     }
   }
 
-  private[this] lazy val _userEventStoreM: Maybe[UserEventStore] = {
+  private[this] lazy val _imEventStoreM: Maybe[IMEventStore] = {
     props.get(Keys.user_event_store_class) map { className ⇒
-      val instance = newInstance[UserEventStore](className)
-      logger.info("Overriding UserEventStore provisioning. Implementation given by: %s".format(instance.getClass))
+      val instance = newInstance[IMEventStore](className)
+      logger.info("Overriding IMEventStore provisioning. Implementation given by: %s".format(instance.getClass))
       instance
     }
   }
@@ -132,34 +137,103 @@ class Configurator(val props: Props) extends Loggable {
     // If there is a specific `IMStore` implementation specified in the
     // properties, then this implementation overrides the event store given by
     // `IMProvider`.
-    props.get(Keys.wallet_entry_store_class) map { className ⇒
-      val instance = newInstance[WalletEntryStore](className)
-      logger.info("Overriding WalletEntryStore provisioning. Implementation given by: %s".format(instance.getClass))
-      instance
+    props.get(Keys.wallet_entry_store_class) map {
+      className ⇒
+        val instance = newInstance[WalletEntryStore](className)
+        logger.info("Overriding WalletEntryStore provisioning. Implementation given by: %s".format(instance.getClass))
+        instance
+    }
+  }
+
+  private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
+    props.get(Keys.policy_store_class) map {
+      className ⇒
+        val instance = newInstance[PolicyStore](className)
+        logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass))
+        instance
+    }
+  }
+
+  private[this] lazy val _eventsStoreFolder: Maybe[File] = {
+    props.get(Keys.events_store_folder) map {
+      folderName ⇒
+        val canonicalFolder = {
+          val folder = new File(folderName)
+          if(folder.isAbsolute) {
+            folder.getCanonicalFile
+          } else {
+            logger.info("{} is not absolute, making it relative to AQUARIUM_HOME", Keys.events_store_folder)
+            new File(ResourceLocator.AQUARIUM_HOME_FOLDER, folderName).getCanonicalFile
+          }
+        }
+
+        val canonicalPath = canonicalFolder.getCanonicalPath
+
+        logger.info("{} = {}", Keys.events_store_folder, canonicalPath)
+
+        if(canonicalFolder.exists() && !canonicalFolder.isDirectory) {
+          throw new AquariumException("%s = %s is not a folder".format(Keys.events_store_folder, canonicalFolder))
+        }
+
+        // Now, events folder must be outside AQUARIUM_HOME, since AQUARIUM_HOME can be wiped out for an upgrade but
+        // we still want to keep the events.
+        val ahCanonicalPath = ResourceLocator.AQUARIUM_HOME_FOLDER.getCanonicalPath
+        if(canonicalPath.startsWith(ahCanonicalPath)) {
+          throw new AquariumException(
+            "%s = %s is under %s = %s".format(
+              Keys.events_store_folder, canonicalFolder,
+              ResourceLocator.AQUARIUM_HOME.name, ahCanonicalPath
+            ))
+        }
+
+        canonicalFolder.mkdirs()
+
+        canonicalFolder
     }
   }
 
-  private[this] lazy val _resEventProc: ResourceEventProcessorService = new ResourceEventProcessorService
+  private[this] lazy val _resEventProc = new ResourceEventProcessorService
+
+  private[this] lazy val _imEventProc = new IMEventProcessorService
 
-  private[this] lazy val _imEventProc: UserEventProcessorService = new UserEventProcessorService
+  private[this] lazy val _lifecycleServices = List(AkkaService, _restService, _actorProvider, _resEventProc, _imEventProc)
 
   def get(key: String, default: String = ""): String = props.getOr(key, default)
 
   def defaultClassLoader = Thread.currentThread().getContextClassLoader
 
+  /**
+   * FIXME: This must be ditched.
+   * 
+   * Find a file whose location can be overiden in
+   * the configuration file (e.g. policy.yaml)
+   *
+   * @param name Name of the file to search for
+   * @param prop Name of the property that defines the file path
+   * @param default Name to return if no file is found
+   */
+  def findConfigFile(name: String, prop: String, default: String): File = {
+    // Check for the configured value first
+    val configured = new File(get(prop))
+    if (configured.exists)
+      return configured
+
+    // Look into the configuration context
+    ResourceLocator.getResource(name) match {
+      case Just(policyResource) ⇒
+        val path = policyResource.url.getPath
+        new File(path)
+      case _ ⇒
+        new File(default)
+    }
+  }
+
   def startServices(): Unit = {
-    _restService.start()
-    _actorProvider.start()
-    _resEventProc.start()
-    _imEventProc.start()
+    _lifecycleServices.foreach(_.start())
   }
 
   def stopServices(): Unit = {
-    _imEventProc.stop()
-    _resEventProc.stop()
-    _restService.stop()
-    _actorProvider.stop()
-
+    _lifecycleServices.reverse.foreach(_.stop())
 //    akka.actor.Actor.registry.shutdownAll()
   }
 
@@ -191,14 +265,35 @@ class Configurator(val props: Props) extends Loggable {
     }
   }
 
-  def userEventStore = {
-    _userEventStoreM match {
+  def imEventStore = {
+    _imEventStoreM match {
       case Just(es) ⇒ es
-      case _        ⇒ storeProvider.userEventStore
+      case _        ⇒ storeProvider.imEventStore
+    }
+  }
+
+  def policyStore = {
+    _policyStoreM match {
+      case Just(es) ⇒ es
+      case _        ⇒ storeProvider.policyStore
     }
   }
 
   def storeProvider = _storeProvider
+  
+  def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Configurator = {
+    val map = this.props.map
+    val newMap = map.updated(Keys.store_provider_class, spc.getName)
+    val newProps = new Props(newMap)
+    new Configurator(newProps)
+  }
+
+  def eventsStoreFolder = _eventsStoreFolder
+
+  def adminCookie: MaybeOption[String] = props.get(Configurator.Keys.admin_cookie) match {
+    case just @ Just(_) ⇒ just
+    case _ ⇒ NoVal
+  }
 }
 
 object Configurator {
@@ -206,61 +301,19 @@ object Configurator {
 
   val MasterConfName = "aquarium.properties"
 
-  /**
-   * Current directory resource context.
-   * Normally this should be the application installation directory.
-   *
-   * It takes priority over `ClasspathBaseResourceContext`.
-   */
-  val AppBaseResourceContext = new FileStreamResourceContext(".")
+  val PolicyConfName = "policy.yaml"
 
-  /**
-   * The venerable /etc resource context
-   */
-  val SlashEtcResourceContext = new FileStreamResourceContext("/etc")
-
-  /**
-   * Class loader resource context.
-   * This has the lowest priority.
-   */
-  val ClasspathBaseResourceContext = new ClassLoaderStreamResourceContext(Thread.currentThread().getContextClassLoader)
-
-  /**
-   * Use this property to override the place where aquarium configuration resides.
-   *
-   * The value of this property is a folder that defines the highest-priority resource context.
-   */
-  val ConfBaseFolderSysProp = SysProp("aquarium.conf.base.folder")
-
-  /**
-   * The resource context used in the application.
-   */
-  lazy val MasterResourceContext = {
-    val rc0 = ClasspathBaseResourceContext
-    val rc1 = AppBaseResourceContext
-    val rc2 = SlashEtcResourceContext
-    val basicContext = new CompositeStreamResourceContext(NoVal, rc2, rc1, rc0)
-    
-    ConfBaseFolderSysProp.value match {
-      case Just(value) ⇒
-        // We have a system override for the configuration location
-        new CompositeStreamResourceContext(Just(basicContext), new FileStreamResourceContext(value))
-      case NoVal ⇒
-        basicContext
-      case Failed(e, m) ⇒
-        throw new RuntimeException(m , e)
-    }
-  }
+  val RolesAgreementsName = "roles-agreements.map"
 
   lazy val MasterConfResource = {
-    val maybeMCResource = MasterResourceContext getResource MasterConfName
+    val maybeMCResource = ResourceLocator getResource MasterConfName
     maybeMCResource match {
       case Just(masterConfResource) ⇒
         masterConfResource
       case NoVal ⇒
-        throw new RuntimeException("Could not find master configuration file: %s".format(MasterConfName))
-      case Failed(e, m) ⇒
-        throw new RuntimeException(m, e)
+        throw new AquariumException("Could not find master configuration file: %s".format(MasterConfName))
+      case Failed(e) ⇒
+        throw new AquariumException(e, "Could not find master configuration file: %s".format(MasterConfName))
     }
   }
 
@@ -270,9 +323,9 @@ object Configurator {
       case Just(props) ⇒
         props
       case NoVal ⇒
-        throw new RuntimeException("Could not load master configuration file: %s".format(MasterConfName))
-      case Failed(e, m) ⇒
-        throw new RuntimeException(m, e)
+        throw new AquariumException("Could not load master configuration file: %s".format(MasterConfName))
+      case Failed(e) ⇒
+        throw new AquariumException(e, "Could not load master configuration file: %s".format(MasterConfName))
     }
   }
 
@@ -281,9 +334,9 @@ object Configurator {
       case Just(masterConf) ⇒
         masterConf
       case NoVal ⇒
-        throw new RuntimeException("Could not initialize master configuration file: %s".format(MasterConfName))
-      case Failed(e, m) ⇒
-        throw new RuntimeException(m, e)
+        throw new AquariumException("Could not initialize master configuration file: %s".format(MasterConfName))
+      case Failed(e) ⇒
+        throw new AquariumException(e, "Could not initialize master configuration file: %s".format(MasterConfName))
     }
   }
 
@@ -291,13 +344,14 @@ object Configurator {
    * Defines the names of all the known keys inside the master properties file.
    */
   final object Keys {
+
     /**
      * The Aquarium version. Will be reported in any due occasion.
      */
     final val version = "version"
 
     /**
-     * The fully qualified name of the class that implements the `ActorProvider`.
+     * The fully qualified name of the class that implements the `ActorProviderService`.
      * Will be instantiated reflectively and should have a public default constructor.
      */
     final val actor_provider_class = "actor.provider.class"
@@ -333,6 +387,12 @@ object Configurator {
      */
     final val wallet_entry_store_class = "wallet.entry.store.class"
 
+    /**
+     * The class that implements the wallet entries store
+     */
+    final val policy_store_class = "policy.store.class"
+
+
     /** The lower mark for the UserActors' LRU, managed by UserActorManager.
      *
      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
@@ -377,7 +437,23 @@ object Configurator {
     /**
      * Comma separated list of exchanges known to aquarium
      */
-    final val amqp_exchanges = "amqp.exchanges"
+    final val amqp_exchange = "amqp.exchange"
+
+    /**
+     * Queues for retrieving resource events from. Multiple queues can be
+     * declared, seperated by semicolon
+     *
+     * Format is `exchange:routing.key:queue-name;<exchnage2:routing.key2:queue-name>;...`
+     */
+    final val amqp_resevents_queues = "amqp.resevents.queues"
+
+    /**
+     * Queues for retrieving user events from. Multiple queues can be
+     * declared, seperated by semicolon
+     *
+     * Format is `exchange:routing.key:queue-name;<exchnage2:routing.key2:queue-name>;...`
+     */
+    final val amqp_userevents_queues="amqp.userevents.queues"
 
     /**
      * REST service listening port.
@@ -427,6 +503,11 @@ object Configurator {
     final val aquarium_policy = "aquarium.policy"
 
     /**
+     * Location of the role-agreement mapping file
+     */
+    final val aquarium_role_agreement_map = "aquarium.role-agreement.map"
+    
+    /**
      * A time period in milliseconds for which we can tolerate stale data regarding user state.
      *
      * The smaller the value, the more accurate the user credits and other state data are.
@@ -436,5 +517,39 @@ object Configurator {
      * the balance is triggered.
      */
     final val user_state_timestamp_threshold = "user.state.timestamp.threshold"
+
+    /**
+     * The time unit is the lowest billable time period.
+     * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine
+     * seconds, then the user will be billed for ten seconds.
+     *
+     * This is an overall constant. We use it as a property in order to prepare ourselves for
+     * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud
+     * infrastructures.
+     */
+    final val time_unit_in_millis = "time.unit.in.seconds"
+
+    /**
+     * If a value is given to this property, then it represents a folder where all events coming to aquarium are
+     * stored.
+     */
+    final val events_store_folder = "events.store.folder"
+
+    /**
+     * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.events.im.IMEventModel]] is
+     * saved to the [[gr.grnet.aquarium.store.IMEventStore]].
+     */
+    final val save_unparsed_event_im = "save.unparsed.event.im"
+
+    /**
+     * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
+     * an authorised client.
+     */
+    final val admin_cookie = "admin.cookie"
   }
-}
\ No newline at end of file
+
+  object HTTP {
+    final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
+    final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
+  }
+}