import gr.grnet.aquarium.util.Loggable
import gr.grnet.aquarium.store.{RecordID, StoreException, EventStore}
-import gr.grnet.aquarium.logic.events.{ResourceEvent, AquariumEvent}
import com.ckkloverdos.maybe.{Failed, Just, Maybe}
-import com.mongodb.{MongoException, MongoOptions, ServerAddress}
-import com.mongodb.casbah.Imports._
+import gr.grnet.aquarium.logic.events.{ResourceEvent, AquariumEvent}
+import com.mongodb.util.JSON
+import com.mongodb._
+import collection.mutable.ListBuffer
/**
* Mongodb implementation of the message store.
database: String)
extends EventStore with Loggable {
-
- lazy val mongo: Option[MongoConnection] = {
- try {
- val addr = new ServerAddress(host, port.toInt)
- val opt = new MongoOptions()
- Some(MongoConnection(addr, opt))
- } catch {
- case e: MongoException =>
- logger.error(("Cannot connect to mongo at %s:%s (uname=%s). " +
- "Cause:").format(host, port, username, e))
- None
- case nfe: NumberFormatException =>
- logger.error("%s is not a valid port number".format(port))
- None
+ private[store] lazy val events: DBCollection = getCollection(MongoDBStore.EVENTS_COLLECTION)
+
+ private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
+
+ // TODO: This must be initialized just once, to take advantage of Mongo's
+ // connection pooling
+ object Connection {
+ lazy val mongo: Option[Mongo] = {
+ try {
+ val addr = new ServerAddress(host, port.toInt)
+ val opt = new MongoOptions()
+ Some(new Mongo(addr, opt))
+ } catch {
+ case e: MongoException =>
+ logger.error(("Cannot connect to mongo at %s:%s (uname=%s). " +
+ "Cause:").format(host, port, username, e))
+ None
+ case nfe: NumberFormatException =>
+ logger.error("%s is not a valid port number".format(port))
+ None
+ }
}
}
- private[store] lazy val events: MongoCollection = getCollection("events")
-
- private[store] lazy val users: MongoCollection = getCollection("user")
-
- private[store]def getCollection(name: String): MongoCollection = {
- mongo match {
+ def getCollection(name: String): DBCollection = {
+ Connection.mongo match {
case Some(x) =>
val db = x.getDB(database)
- if(!db.authenticate(username, passwd))
+ if(!db.authenticate(username, passwd.toCharArray))
throw new StoreException("Could not authenticate user %s".format(username))
- db(name)
+ db.getCollection(name)
case None => throw new StoreException("No connection to Mongo")
}
}
def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = {
try {
// Store
- events += event
+ val obj = JSON.parse(event.toJson).asInstanceOf[DBObject]
+ events.insert(obj)
// TODO: Make this retrieval a configurable option
// Get back to retrieve unique id
- val q = MongoDBObject("id" -> event.id)
+ val q = new BasicDBObject()
+ q.put("id", event.id)
+
val cur = events.find(q)
if (!cur.hasNext) {
logger.error("Failed to store event: %s".format(event))
- return Failed(new StoreException("Failed to store event: %s".format(event)))
+ Failed(new StoreException("Failed to store event: %s".format(event)))
}
- Just(RecordID(cur.next.get("id").toString))
+ Just(RecordID(cur.next.get("_id").toString))
} catch {
case m: MongoException =>
logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
}
def findEventById[A <: AquariumEvent](id: String): Option[A] = {
- val a: Option[DBObject] = events.findOne(DBObject("id" -> id))
- a.map(x => x: MongoDBObject).map(x => x: A)
+ val q = new BasicDBObject()
+ q.put("id", id)
+
+ val cur = events.find(q)
+
+ if (cur.hasNext)
+ Some(deserialize(cur.next))
+ else
+ None
}
def findEventsByUserId[A <: AquariumEvent](userId: Long)
- (sortWith: Option[(A, A) => Boolean]): List[A] = {
- List()
- }
+ (sortWith: Option[(A, A) => Boolean]): List[A] = {
+ val q = new BasicDBObject()
+ q.put("userId", userId)
- implicit def toMongoDBObject[A <: AquariumEvent](a: A): DBObject = {
+ val cur = events.find(q)
- val builder = MongoDBObject.newBuilder
- a.toMap.foreach{x => builder += x._1 -> x._2}
+ if (!cur.hasNext)
+ return List()
- builder.result()
+ val buff = new ListBuffer[A]()
+
+ while(cur.hasNext)
+ buff += deserialize(cur.next)
+
+ sortWith match {
+ case Some(sorter) => buff.toList.sortWith(sorter)
+ case None => buff.toList
+ }
}
- implicit def toAqEventSubclass[A <: AquariumEvent](a: MongoDBObject): A = {
- //TODO: This must be amended when we have more AquariumEvent subclasses
- val id = a.getOrElse("id", "0").asInstanceOf[String]
- val userId = a.getOrElse("userId", 0).asInstanceOf[Long]
- val clientId = a.getOrElse("clientId", 0).asInstanceOf[Long]
- val resource = a.getOrElse("resource", "").asInstanceOf[String]
- val timestamp = a.getOrElse("timestamp", 0).asInstanceOf[Long]
- val eventVersion = a.getOrElse("eventVersion", 0).asInstanceOf[Short]
- val details = a.getOrElse("details", Map()).asInstanceOf[Map[String, String]]
- ResourceEvent(id,userId,clientId,resource,timestamp,eventVersion,details).asInstanceOf[A]
+ private def deserialize[A <: AquariumEvent](a: DBObject): A = {
+ //TODO: Distinguish events and deserialize appropriately
+ ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
}
}
+
+object MongoDBStore {
+ def EVENTS_COLLECTION = "events"
+ def USERS_COLLECTION = "users"
+}
\ No newline at end of file
package gr.grnet.aquarium.store.mongodb
-import org.junit.Test
import gr.grnet.aquarium.util.{TestMethods, RandomEventGenerator}
import org.junit.Assume._
import org.junit.Assert._
-import gr.grnet.aquarium.LogicTestsAssumptions
-import gr.grnet.aquarium.store.{RecordID, Store}
+import gr.grnet.aquarium.store.Store
import collection.mutable.ArrayBuffer
import gr.grnet.aquarium.logic.events.ResourceEvent
+import org.junit.{Before, After, Test}
+import gr.grnet.aquarium.store.mongodb.MongoDBStore
+import gr.grnet.aquarium.{MasterConf, LogicTestsAssumptions}
/**
- *
* @author Georgios Gousios <gousiosg@gmail.com>
*/
-
class EventStoreTest extends TestMethods with RandomEventGenerator {
+ @Before
+ def before() = {
+
+ }
+
@Test
def testStoreEvent() = {
assumeTrue(LogicTestsAssumptions.EnableMongoDBTests)
val result = store.get.storeEvent(event)
assert(result.isJust)
- assertEquals(event.id, result.getOr(RecordID("foo")).id)
+ }
+
+ @Test
+ def testFindEventById(): Unit = {
+ assumeTrue(LogicTestsAssumptions.EnableMongoDBTests)
+
+ val event = nextResourceEvent()
+ val store = Store.getEventStore()
+
+ val result1 = store.get.storeEvent(event)
+ assert(result1.isJust)
+
+ val result2 = store.get.findEventById[ResourceEvent](event.id)
+ assertNotNone(result2)
}
@Test
store.get.storeEvent(e)
}
- val mostUsedId = events.map{x => x.userId}.groupBy(identity).mapValues(_.size).foldLeft((0L,0))((acc, kv) => if (kv._2 > acc._2) kv else acc)._1
- println("Most used id:" + mostUsedId)
+ val mostUsedId = events
+ .map{x => x.userId}
+ .groupBy(identity)
+ .mapValues(_.size)
+ .foldLeft((0L,0))((acc, kv) => if (kv._2 > acc._2) kv else acc)._1
+
+ val result = store.get.findEventsByUserId(mostUsedId)(None)
+ assertEquals(events.filter(p => p.userId.equals(mostUsedId)).size, result.size)
+ }
+
+ @Test
+ def testMultipleMongos = {
+ val a = getMongo
+ val b = getMongo
+ assertEquals(a.Connection.mongo.get.hashCode(), b.Connection.mongo.get.hashCode())
+ }
- store.get.findEventsByUserId(mostUsedId)(None)
+ @After
+ override def after() = {
+ val a = getMongo
+ val col = a.Connection.mongo.get.getDB(
+ MasterConf.MasterConf.get(MasterConf.Keys.persistence_db)
+ ).getCollection(MongoDBStore.EVENTS_COLLECTION)
+
+ val res = col.find
+ while (res.hasNext)
+ col.remove(res.next)
}
+
+ private def getMongo = new MongoDBStore(
+ MasterConf.MasterConf.get(MasterConf.Keys.persistence_host),
+ MasterConf.MasterConf.get(MasterConf.Keys.persistence_port),
+ MasterConf.MasterConf.get(MasterConf.Keys.persistence_username),
+ MasterConf.MasterConf.get(MasterConf.Keys.persistence_password),
+ MasterConf.MasterConf.get(MasterConf.Keys.persistence_db))
}
\ No newline at end of file