WIP: New state machine for message processing
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStoreProvider.scala
index d658eca..655b12e 100644 (file)
@@ -38,52 +38,88 @@ package gr.grnet.aquarium.store.mongodb
 import com.ckkloverdos.props.Props
 import com.mongodb.{MongoException, Mongo, MongoOptions, ServerAddress}
 import gr.grnet.aquarium.store._
-import gr.grnet.aquarium.{AquariumException, Configurable}
+import gr.grnet.aquarium.{AquariumAwareSkeleton, AquariumException, Configurable}
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+import com.google.common.eventbus.Subscribe
+import com.ckkloverdos.key.{IntKey, StringKey}
+import gr.grnet.aquarium.util.Loggable
 
 /**
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-class MongoDBStoreProvider extends StoreProvider with Configurable {
-  private[this] var _mongo: Mongo = _
-  private[this] var _database: String = _
-  private[this] var _username: String = _
-  private[this] var _password: String = _
+class MongoDBStoreProvider extends StoreProvider with Configurable with Loggable with AquariumAwareSkeleton {
+  private case class ConnectionData(
+      database: String,
+      host: String,
+      port: Int,
+      username: String,
+      password: String,
+      connectionsPerHost: Int,
+      threadsAllowedToBlockForConnectionMultiplier: Int
+  )
 
+  private[this] var _mongo: Mongo = _
+  private[this] var _connectionData: ConnectionData = _
   private[this] var _mongoDBStore: MongoDBStore = _
 
-  def propertyPrefix = Some(MongoDBStoreProvider.MongoDBKeys.Prefix)
+  def propertyPrefix = Some(MongoDBStoreProvider.Prefix)
 
   def configure(props: Props) = {
-    import MongoDBStoreProvider.MongoDBKeys
+    import MongoDBStoreProvider.EnvKeys
+
+    this._connectionData = ConnectionData(
+      database = props.getEx(EnvKeys.mongodbDatabase.name),
+      host =  props.getEx(EnvKeys.mongodbHost.name),
+      port = props.getIntEx(EnvKeys.mongodbPort.name),
+      username = props.getEx(EnvKeys.mongodbUsername.name),
+      password = props.getEx(EnvKeys.mongodbPassword.name),
+      connectionsPerHost = props.getInt(EnvKeys.mongodbConnectionsPerHost.name).getOr(20),
+      threadsAllowedToBlockForConnectionMultiplier = props.getInt(
+        EnvKeys.mongodbThreadsAllowedToBlockForConnectionMultiplier.name).getOr(5)
+    )
+  }
 
-    this._database = props.getEx(MongoDBKeys.dbschema)
-    this._username = props.getEx(MongoDBKeys.username)
-    this._password = props.getEx(MongoDBKeys.password)
-    val host = props.getEx(MongoDBKeys.host)
-    val port = props.getEx(MongoDBKeys.port).toInt
+  @Subscribe
+  override def awareOfAquarium(event: AquariumCreatedEvent) {
+    super.awareOfAquarium(event)
 
+    doSetup()
+  }
+
+  private def doSetup() {
     try {
-      val addr = new ServerAddress(host, port)
+      val host = this._connectionData.host
+      val port = this._connectionData.port
+      val serverAddress = new ServerAddress(host, port)
 
       val opt = new MongoOptions()
-      opt.connectionsPerHost = props.getEx(MongoDBKeys.connection_pool_size).toInt
-      opt.threadsAllowedToBlockForConnectionMultiplier = 8
-
-      this._mongo = new Mongo(addr, opt)
-      this._mongoDBStore = new MongoDBStore(this._mongo, this._database, this._username, this._password)
+      opt.connectionsPerHost = this._connectionData.connectionsPerHost
+      opt.threadsAllowedToBlockForConnectionMultiplier = this._connectionData.threadsAllowedToBlockForConnectionMultiplier
+
+      this._mongo = new Mongo(serverAddress, opt)
+      this._mongoDBStore = new MongoDBStore(
+        aquarium,
+        this._mongo,
+        this._connectionData.database,
+        this._connectionData.username,
+        this._connectionData.password
+      )
     } catch {
-      case e: MongoException =>
-        throw new AquariumException("Cannot connect to mongo at %s:%s".format(host, port), e)
+      case e: MongoException ⇒
+        throw new AquariumException("While connecting to MongoDB using %s".format(this._connectionData), e)
+
+      case e: Exception ⇒
+        throw new AquariumException("While connecting to MongoDB using %s".format(this._connectionData), e)
     }
   }
 
   def userStateStore = _mongoDBStore
   def resourceEventStore = _mongoDBStore
-  def walletEntryStore = _mongoDBStore
   def imEventStore = _mongoDBStore
   def policyStore = _mongoDBStore
+  def userAgreementHistoryStore = _mongoDBStore
 }
 
 /**
@@ -92,47 +128,36 @@ class MongoDBStoreProvider extends StoreProvider with Configurable {
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 object MongoDBStoreProvider {
+  final val Prefix = "mongodb"
+  final val PrefixAndDot = Prefix + "."
 
-  /**
-   * Note that these keys must be prefixed by `mongodb` in the configuration file
-   *
-   * @author Christos KK Loverdos <loverdos@gmail.com>
-   */
-  object MongoDBKeys {
-    final val Prefix = "mongodb"
-    final val PrefixAndDot = Prefix + "."
-
-    private[this] def p(name: String) = PrefixAndDot + name
-
+  object EnvKeys {
     /**
      * Hostname for the MongoDB
      */
-    final val host = p("host")
+    final val mongodbHost = StringKey(PrefixAndDot + "host")
 
-    /**
-     * Username for connecting to the MongoDB
-     */
-    final val username = p("username")
 
-    /**
-     *  Password for connecting to the MongoDB
-     */
-    final val password = p("password")
+    final val mongodbPort     = IntKey   (PrefixAndDot + "port")
 
     /**
-     *  Password for connecting to the MongoDB
+     * Username for connecting to the MongoDB.
      */
-    final val port = p("port")
+    final val mongodbUsername = StringKey(PrefixAndDot + "username")
 
     /**
-     *  The DB schema to use
+     *  Password for connecting to the MongoDB.
      */
-    final val dbschema = p("dbschema")
+    final val mongodbPassword = StringKey(PrefixAndDot + "password")
 
     /**
-     * Maximum number of open connections to MongoDB
+     *  The MongoDB database to use.
      */
-    final val connection_pool_size = p("connection.pool.size")
+    final val mongodbDatabase = StringKey(PrefixAndDot + "database")
+
+    final val mongodbConnectionsPerHost = IntKey(PrefixAndDot + "connections.per.host")
 
+    final val mongodbThreadsAllowedToBlockForConnectionMultiplier =
+      IntKey(PrefixAndDot + "threads.allowed.to.block.for.connection.multiplier")
   }
 }
\ No newline at end of file