Revert to Java MongoDB driver, impl remaining methods
authorGeorgios Gousios <gousiosg@gmail.com>
Wed, 14 Dec 2011 15:36:08 +0000 (17:36 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Wed, 14 Dec 2011 15:36:08 +0000 (17:36 +0200)
Also clean up mongo after test

logic/pom.xml
logic/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala
logic/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala
logic/src/test/scala/gr/grnet/aquarium/store/mongodb/EventStoreTest.scala
logic/src/test/scala/gr/grnet/aquarium/util/RandomEventGenerator.scala

index 6815b38..fd3fe89 100644 (file)
       <version>1.4.1</version>
     </dependency>
 
-    <!--<dependency>
+    <dependency>
       <groupId>org.mongodb</groupId>
       <artifactId>mongo-java-driver</artifactId>
       <version>2.7.2</version>
-    </dependency>-->
+    </dependency>
 
-    <dependency>
+    <!--<dependency>
       <groupId>com.mongodb.casbah</groupId>
       <artifactId>casbah-core_2.9.1</artifactId>
       <version>2.1.5-1</version>
-    </dependency>
+    </dependency>-->
 
     <dependency>
       <groupId>se.scalablesolutions.akka</groupId>
index 71afd6a..df9b656 100644 (file)
@@ -90,7 +90,7 @@ object ResourceEvent {
   def fromJson(json: String): ResourceEvent = {
     implicit val formats = JsonHelpers.DefaultJsonFormats
     val jsonAST = parseJson(json)
-    Extraction.extract(jsonAST)
+    Extraction.extract[ResourceEvent](jsonAST)
   }
 
   def fromJValue(jsonAST: JsonAST.JValue): ResourceEvent = {
index c601a27..00369aa 100644 (file)
@@ -37,10 +37,11 @@ package gr.grnet.aquarium.store.mongodb
 
 import gr.grnet.aquarium.util.Loggable
 import gr.grnet.aquarium.store.{RecordID, StoreException, EventStore}
-import gr.grnet.aquarium.logic.events.{ResourceEvent, AquariumEvent}
 import com.ckkloverdos.maybe.{Failed, Just, Maybe}
-import com.mongodb.{MongoException, MongoOptions, ServerAddress}
-import com.mongodb.casbah.Imports._
+import gr.grnet.aquarium.logic.events.{ResourceEvent, AquariumEvent}
+import com.mongodb.util.JSON
+import com.mongodb._
+import collection.mutable.ListBuffer
 
 /**
  * Mongodb implementation of the message store.
@@ -53,34 +54,37 @@ class MongoDBStore(host: String, port: String,
                    database: String)
   extends EventStore with Loggable {
 
-
-  lazy val mongo: Option[MongoConnection] = {
-    try {
-      val addr = new ServerAddress(host, port.toInt)
-      val opt = new MongoOptions()
-      Some(MongoConnection(addr, opt))
-    } catch {
-      case e: MongoException =>
-        logger.error(("Cannot connect to mongo at %s:%s (uname=%s). " +
-          "Cause:").format(host, port, username, e))
-        None
-      case nfe: NumberFormatException =>
-        logger.error("%s is not a valid port number".format(port))
-        None
+  private[store] lazy val events: DBCollection = getCollection(MongoDBStore.EVENTS_COLLECTION)
+
+  private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
+
+  // TODO: This must be initialized just once, to take advantage of Mongo's
+  // connection pooling
+  object Connection {
+    lazy val mongo: Option[Mongo] = {
+      try {
+        val addr = new ServerAddress(host, port.toInt)
+        val opt = new MongoOptions()
+        Some(new Mongo(addr, opt))
+      } catch {
+        case e: MongoException =>
+          logger.error(("Cannot connect to mongo at %s:%s (uname=%s). " +
+            "Cause:").format(host, port, username, e))
+          None
+        case nfe: NumberFormatException =>
+          logger.error("%s is not a valid port number".format(port))
+          None
+      }
     }
   }
 
-  private[store] lazy val events: MongoCollection = getCollection("events")
-
-  private[store] lazy val users: MongoCollection = getCollection("user")
-
-  private[store]def getCollection(name: String): MongoCollection = {
-    mongo match {
+  def getCollection(name: String): DBCollection = {
+    Connection.mongo match {
       case Some(x) =>
         val db = x.getDB(database)
-        if(!db.authenticate(username, passwd))
+        if(!db.authenticate(username, passwd.toCharArray))
           throw new StoreException("Could not authenticate user %s".format(username))
-        db(name)
+        db.getCollection(name)
       case None => throw new StoreException("No connection to Mongo")
     }
   }
@@ -93,19 +97,22 @@ class MongoDBStore(host: String, port: String,
   def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = {
     try {
       // Store
-      events += event
+      val obj = JSON.parse(event.toJson).asInstanceOf[DBObject]
+      events.insert(obj)
 
       // TODO: Make this retrieval a configurable option
       // Get back to retrieve unique id
-      val q = MongoDBObject("id" -> event.id)
+      val q = new BasicDBObject()
+      q.put("id", event.id)
+
       val cur = events.find(q)
 
       if (!cur.hasNext) {
         logger.error("Failed to store event: %s".format(event))
-        return Failed(new StoreException("Failed to store event: %s".format(event)))
+        Failed(new StoreException("Failed to store event: %s".format(event)))
       }
 
-      Just(RecordID(cur.next.get("id").toString))
+      Just(RecordID(cur.next.get("_id").toString))
     } catch {
       case m: MongoException =>
         logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
@@ -113,32 +120,45 @@ class MongoDBStore(host: String, port: String,
   }
 
   def findEventById[A <: AquariumEvent](id: String): Option[A] = {
-    val a: Option[DBObject] = events.findOne(DBObject("id" -> id))
-    a.map(x => x: MongoDBObject).map(x => x: A)
+    val q = new BasicDBObject()
+    q.put("id", id)
+
+    val cur = events.find(q)
+
+    if (cur.hasNext)
+      Some(deserialize(cur.next))
+    else
+      None
   }
 
   def findEventsByUserId[A <: AquariumEvent](userId: Long)
-                                      (sortWith: Option[(A, A) => Boolean]): List[A] = {
-    List()
-  }
+                                            (sortWith: Option[(A, A) => Boolean]): List[A] = {
+    val q = new BasicDBObject()
+    q.put("userId", userId)
 
-  implicit def toMongoDBObject[A <: AquariumEvent](a: A): DBObject = {
+    val cur = events.find(q)
 
-    val builder = MongoDBObject.newBuilder
-    a.toMap.foreach{x => builder += x._1 -> x._2}
+    if (!cur.hasNext)
+      return List()
 
-    builder.result()
+    val buff = new ListBuffer[A]()
+
+    while(cur.hasNext)
+      buff += deserialize(cur.next)
+
+    sortWith match {
+      case Some(sorter) => buff.toList.sortWith(sorter)
+      case None => buff.toList
+    }
   }
 
-  implicit def toAqEventSubclass[A <: AquariumEvent](a: MongoDBObject): A = {
-    //TODO: This must be amended when we have more AquariumEvent subclasses
-    val id = a.getOrElse("id", "0").asInstanceOf[String]
-    val userId = a.getOrElse("userId", 0).asInstanceOf[Long]
-    val clientId = a.getOrElse("clientId", 0).asInstanceOf[Long]
-    val resource = a.getOrElse("resource", "").asInstanceOf[String]
-    val timestamp = a.getOrElse("timestamp", 0).asInstanceOf[Long]
-    val eventVersion = a.getOrElse("eventVersion", 0).asInstanceOf[Short]
-    val details = a.getOrElse("details", Map()).asInstanceOf[Map[String, String]]
-    ResourceEvent(id,userId,clientId,resource,timestamp,eventVersion,details).asInstanceOf[A]
+  private def deserialize[A <: AquariumEvent](a: DBObject): A = {
+    //TODO: Distinguish events and deserialize appropriately
+    ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
   }
 }
+
+object MongoDBStore {
+  def EVENTS_COLLECTION = "events"
+  def USERS_COLLECTION = "users"
+}
\ No newline at end of file
index 21cd5c2..47c2f90 100644 (file)
 
 package gr.grnet.aquarium.store.mongodb
 
-import org.junit.Test
 import gr.grnet.aquarium.util.{TestMethods, RandomEventGenerator}
 import org.junit.Assume._
 import org.junit.Assert._
-import gr.grnet.aquarium.LogicTestsAssumptions
-import gr.grnet.aquarium.store.{RecordID, Store}
+import gr.grnet.aquarium.store.Store
 import collection.mutable.ArrayBuffer
 import gr.grnet.aquarium.logic.events.ResourceEvent
+import org.junit.{Before, After, Test}
+import gr.grnet.aquarium.store.mongodb.MongoDBStore
+import gr.grnet.aquarium.{MasterConf, LogicTestsAssumptions}
 
 /**
- *
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
-
 class EventStoreTest extends TestMethods with RandomEventGenerator {
 
+  @Before
+  def before() = {
+
+  }
+
   @Test
   def testStoreEvent() = {
     assumeTrue(LogicTestsAssumptions.EnableMongoDBTests)
@@ -60,7 +64,20 @@ class EventStoreTest extends TestMethods with RandomEventGenerator {
 
     val result = store.get.storeEvent(event)
     assert(result.isJust)
-    assertEquals(event.id, result.getOr(RecordID("foo")).id)
+  }
+
+  @Test
+  def testFindEventById(): Unit = {
+    assumeTrue(LogicTestsAssumptions.EnableMongoDBTests)
+
+    val event = nextResourceEvent()
+    val store = Store.getEventStore()
+
+    val result1 = store.get.storeEvent(event)
+    assert(result1.isJust)
+
+    val result2 = store.get.findEventById[ResourceEvent](event.id)
+    assertNotNone(result2)
   }
 
   @Test
@@ -76,10 +93,40 @@ class EventStoreTest extends TestMethods with RandomEventGenerator {
         store.get.storeEvent(e)
     }
 
-    val mostUsedId = events.map{x => x.userId}.groupBy(identity).mapValues(_.size).foldLeft((0L,0))((acc, kv) => if (kv._2 > acc._2) kv else acc)._1
-    println("Most used id:" + mostUsedId)
+    val mostUsedId = events
+      .map{x => x.userId}
+      .groupBy(identity)
+      .mapValues(_.size)
+      .foldLeft((0L,0))((acc, kv) => if (kv._2 > acc._2) kv else acc)._1
+
+    val result = store.get.findEventsByUserId(mostUsedId)(None)
+    assertEquals(events.filter(p => p.userId.equals(mostUsedId)).size, result.size)
+  }
+
+  @Test
+  def testMultipleMongos = {
+    val a = getMongo
+    val b = getMongo
+    assertEquals(a.Connection.mongo.get.hashCode(), b.Connection.mongo.get.hashCode())
+  }
 
-    store.get.findEventsByUserId(mostUsedId)(None)
+  @After
+  override def after() = {
+    val a = getMongo
 
+    val col = a.Connection.mongo.get.getDB(
+      MasterConf.MasterConf.get(MasterConf.Keys.persistence_db)
+    ).getCollection(MongoDBStore.EVENTS_COLLECTION)
+
+    val res = col.find
+    while (res.hasNext)
+      col.remove(res.next)
   }
+
+  private def getMongo = new MongoDBStore(
+    MasterConf.MasterConf.get(MasterConf.Keys.persistence_host),
+    MasterConf.MasterConf.get(MasterConf.Keys.persistence_port),
+    MasterConf.MasterConf.get(MasterConf.Keys.persistence_username),
+    MasterConf.MasterConf.get(MasterConf.Keys.persistence_password),
+    MasterConf.MasterConf.get(MasterConf.Keys.persistence_db))
 }
\ No newline at end of file
index 294eb69..ce16839 100644 (file)
@@ -39,6 +39,7 @@ import akka.amqp._
 import gr.grnet.aquarium.logic.events.ResourceEvent
 import gr.grnet.aquarium.messaging.AkkaAMQP
 import util.Random
+import org.junit.After
 
 /**
  *  Generates random resource events to use as input for testing and
@@ -93,4 +94,9 @@ trait RandomEventGenerator extends AkkaAMQP {
         "event.%d.%s".format(event.cliendId, event.resource))
     }
   }
+
+  @After
+  def after() = {
+
+  }
 }
\ No newline at end of file