import gr.grnet.aquarium.util.Loggable
import gr.grnet.aquarium.store.{RecordID, StoreException, EventStore}
-import com.mongodb._
-import util.{JSONParseException, JSON}
-import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
import gr.grnet.aquarium.logic.events.{ResourceEvent, AquariumEvent}
+import com.ckkloverdos.maybe.{Failed, Just, Maybe}
+import com.mongodb.{MongoException, MongoOptions, ServerAddress}
+import com.mongodb.casbah.Imports._
/**
* Mongodb implementation of the message store.
database: String)
extends EventStore with Loggable {
- private object Connection {
- lazy val mongo: Option[Mongo] = {
- try {
- val addr = new ServerAddress(host, port.toInt)
- val opt = new MongoOptions()
- Some(new Mongo(addr, opt))
- } catch {
- case e: MongoException =>
- logger.error(("Cannot connect to mongo at %s:%s (uname=%s). " +
- "Cause:").format(host,port,username, e))
- None
- case nfe: NumberFormatException =>
- logger.error("%s is not a valid port number".format(port))
- None
- }
+
+ lazy val mongo: Option[MongoConnection] = {
+ try {
+ val addr = new ServerAddress(host, port.toInt)
+ val opt = new MongoOptions()
+ Some(MongoConnection(addr, opt))
+ } catch {
+ case e: MongoException =>
+ logger.error(("Cannot connect to mongo at %s:%s (uname=%s). " +
+ "Cause:").format(host, port, username, e))
+ None
+ case nfe: NumberFormatException =>
+ logger.error("%s is not a valid port number".format(port))
+ None
}
}
- private[store] lazy val events: DBCollection = getCollection("events")
+ private[store] lazy val events: MongoCollection = getCollection("events")
- private[store] lazy val users: DBCollection = getCollection("user")
+ private[store] lazy val users: MongoCollection = getCollection("user")
- private[store]def getCollection(name: String): DBCollection = {
- Connection.mongo match {
+ private[store]def getCollection(name: String): MongoCollection = {
+ mongo match {
case Some(x) =>
val db = x.getDB(database)
- if(!db.authenticate(username, passwd.toCharArray))
+ if(!db.authenticate(username, passwd))
throw new StoreException("Could not authenticate user %s".format(username))
- db.getCollection(name)
+ db(name)
case None => throw new StoreException("No connection to Mongo")
}
}
def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = {
try {
// Store
- val obj = JSON.parse(event.toJson).asInstanceOf[DBObject]
- events.insert(obj)
+ events += event
// TODO: Make this retrieval a configurable option
// Get back to retrieve unique id
- val q = new BasicDBObject()
- q.put("id", event.id)
-
+ val q = MongoDBObject("id" -> event.id)
val cur = events.find(q)
if (!cur.hasNext) {
logger.error("Failed to store event: %s".format(event))
- Failed(new StoreException("Failed to store event: %s".format(event)))
+ return Failed(new StoreException("Failed to store event: %s".format(event)))
}
Just(RecordID(cur.next.get("id").toString))
} catch {
- case j: JSONParseException =>
- logger.error("Error parsing JSON for event %s %s".format(event,j)); Failed(j)
case m: MongoException =>
logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
}
}
def findEventById[A <: AquariumEvent](id: String): Option[A] = {
- val q = new BasicDBObject()
- q.put("id", id)
-
- val cur = events.find(q)
-
- if (cur.hasNext)
- deserialize(cur.next)
- else
- None
+ val a: Option[DBObject] = events.findOne(DBObject("id" -> id))
+ a.map(x => x: MongoDBObject).map(x => x: A)
}
def findEventsByUserId[A <: AquariumEvent](userId: Long)
List()
}
- private def deserialize[A <: AquariumEvent](a: DBObject): A = {
- //TODO: Distinguish events and deserialize appropriately
- ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
+ implicit def toMongoDBObject[A <: AquariumEvent](a: A): DBObject = {
+
+ val builder = MongoDBObject.newBuilder
+ a.toMap.foreach{x => builder += x._1 -> x._2}
+
+ builder.result()
+ }
+
+ implicit def toAqEventSubclass[A <: AquariumEvent](a: MongoDBObject): A = {
+ //TODO: This must be amended when we have more AquariumEvent subclasses
+ val id = a.getOrElse("id", "0").asInstanceOf[String]
+ val userId = a.getOrElse("userId", 0).asInstanceOf[Long]
+ val clientId = a.getOrElse("clientId", 0).asInstanceOf[Long]
+ val resource = a.getOrElse("resource", "").asInstanceOf[String]
+ val timestamp = a.getOrElse("timestamp", 0).asInstanceOf[Long]
+ val eventVersion = a.getOrElse("eventVersion", 0).asInstanceOf[Short]
+ val details = a.getOrElse("details", Map()).asInstanceOf[Map[String, String]]
+ ResourceEvent(id,userId,clientId,resource,timestamp,eventVersion,details).asInstanceOf[A]
}
}