Commit for posterity
authorGeorgios Gousios <gousiosg@gmail.com>
Wed, 14 Dec 2011 13:12:19 +0000 (15:12 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Wed, 14 Dec 2011 15:27:36 +0000 (17:27 +0200)
Will revert to plain Java driver for now

logic/pom.xml
logic/src/main/scala/gr/grnet/aquarium/logic/events/AquariumEvent.scala
logic/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala

index 6e0197c..6815b38 100644 (file)
       <version>1.4.1</version>
     </dependency>
 
-    <dependency>
+    <!--<dependency>
       <groupId>org.mongodb</groupId>
       <artifactId>mongo-java-driver</artifactId>
       <version>2.7.2</version>
-    </dependency>
+    </dependency>-->
 
-    <!-- Official MongoDB scala driver -->
-    <!-- For issues with the driver see: https://jira.mongodb.org/browse/SCALA -->
-    <!--<dependency>-->
-    <!--<groupId>com.mongodb.casbah</groupId>-->
-    <!--<artifactId>casbah-core_2.9.1</artifactId>-->
-    <!--<version>2.1.5-1</version>-->
-    <!--</dependency>-->
+    <dependency>
+      <groupId>com.mongodb.casbah</groupId>
+      <artifactId>casbah-core_2.9.1</artifactId>
+      <version>2.1.5-1</version>
+    </dependency>
 
     <dependency>
       <groupId>se.scalablesolutions.akka</groupId>
index 089fb9f..749e760 100644 (file)
@@ -58,4 +58,11 @@ abstract class AquariumEvent(val id: String, val timestamp: Long) extends JsonSu
   }
 
   def eventType: String = shortClassNameOf(this)
+
+  def toMap: Map[String,  Any] =
+    (Map[String, Any]() /: this.getClass.getDeclaredFields) {
+      (a, f) =>
+        f.setAccessible(true)
+        a + (f.getName -> f.get(this))
+      }
 }
index 2553bbd..c601a27 100644 (file)
@@ -37,10 +37,10 @@ package gr.grnet.aquarium.store.mongodb
 
 import gr.grnet.aquarium.util.Loggable
 import gr.grnet.aquarium.store.{RecordID, StoreException, EventStore}
-import com.mongodb._
-import util.{JSONParseException, JSON}
-import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
 import gr.grnet.aquarium.logic.events.{ResourceEvent, AquariumEvent}
+import com.ckkloverdos.maybe.{Failed, Just, Maybe}
+import com.mongodb.{MongoException, MongoOptions, ServerAddress}
+import com.mongodb.casbah.Imports._
 
 /**
  * Mongodb implementation of the message store.
@@ -53,35 +53,34 @@ class MongoDBStore(host: String, port: String,
                    database: String)
   extends EventStore with Loggable {
 
-  private object Connection {
-    lazy val mongo: Option[Mongo] = {
-      try {
-        val addr = new ServerAddress(host, port.toInt)
-        val opt = new MongoOptions()
-        Some(new Mongo(addr, opt))
-      } catch {
-        case e: MongoException =>
-          logger.error(("Cannot connect to mongo at %s:%s (uname=%s). " +
-            "Cause:").format(host,port,username, e))
-          None
-        case nfe: NumberFormatException =>
-          logger.error("%s is not a valid port number".format(port))
-          None
-      }
+
+  lazy val mongo: Option[MongoConnection] = {
+    try {
+      val addr = new ServerAddress(host, port.toInt)
+      val opt = new MongoOptions()
+      Some(MongoConnection(addr, opt))
+    } catch {
+      case e: MongoException =>
+        logger.error(("Cannot connect to mongo at %s:%s (uname=%s). " +
+          "Cause:").format(host, port, username, e))
+        None
+      case nfe: NumberFormatException =>
+        logger.error("%s is not a valid port number".format(port))
+        None
     }
   }
 
-  private[store] lazy val events: DBCollection = getCollection("events")
+  private[store] lazy val events: MongoCollection = getCollection("events")
 
-  private[store] lazy val users: DBCollection = getCollection("user")
+  private[store] lazy val users: MongoCollection = getCollection("user")
 
-  private[store]def getCollection(name: String): DBCollection = {
-    Connection.mongo match {
+  private[store]def getCollection(name: String): MongoCollection = {
+    mongo match {
       case Some(x) =>
         val db = x.getDB(database)
-        if(!db.authenticate(username, passwd.toCharArray))
+        if(!db.authenticate(username, passwd))
           throw new StoreException("Could not authenticate user %s".format(username))
-        db.getCollection(name)
+        db(name)
       case None => throw new StoreException("No connection to Mongo")
     }
   }
@@ -94,40 +93,28 @@ class MongoDBStore(host: String, port: String,
   def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = {
     try {
       // Store
-      val obj = JSON.parse(event.toJson).asInstanceOf[DBObject]
-      events.insert(obj)
+      events += event
 
       // TODO: Make this retrieval a configurable option
       // Get back to retrieve unique id
-      val q = new BasicDBObject()
-      q.put("id", event.id)
-
+      val q = MongoDBObject("id" -> event.id)
       val cur = events.find(q)
 
       if (!cur.hasNext) {
         logger.error("Failed to store event: %s".format(event))
-        Failed(new StoreException("Failed to store event: %s".format(event)))
+        return Failed(new StoreException("Failed to store event: %s".format(event)))
       }
 
       Just(RecordID(cur.next.get("id").toString))
     } catch {
-      case j: JSONParseException =>
-        logger.error("Error parsing JSON for event %s %s".format(event,j)); Failed(j)
       case m: MongoException =>
         logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
     }
   }
 
   def findEventById[A <: AquariumEvent](id: String): Option[A] = {
-    val q = new BasicDBObject()
-    q.put("id", id)
-
-    val cur = events.find(q)
-
-    if (cur.hasNext)
-      deserialize(cur.next)
-    else
-      None
+    val a: Option[DBObject] = events.findOne(DBObject("id" -> id))
+    a.map(x => x: MongoDBObject).map(x => x: A)
   }
 
   def findEventsByUserId[A <: AquariumEvent](userId: Long)
@@ -135,8 +122,23 @@ class MongoDBStore(host: String, port: String,
     List()
   }
 
-  private def deserialize[A <: AquariumEvent](a: DBObject): A = {
-    //TODO: Distinguish events and deserialize appropriately
-    ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
+  implicit def toMongoDBObject[A <: AquariumEvent](a: A): DBObject = {
+
+    val builder = MongoDBObject.newBuilder
+    a.toMap.foreach{x => builder += x._1 -> x._2}
+
+    builder.result()
+  }
+
+  implicit def toAqEventSubclass[A <: AquariumEvent](a: MongoDBObject): A = {
+    //TODO: This must be amended when we have more AquariumEvent subclasses
+    val id = a.getOrElse("id", "0").asInstanceOf[String]
+    val userId = a.getOrElse("userId", 0).asInstanceOf[Long]
+    val clientId = a.getOrElse("clientId", 0).asInstanceOf[Long]
+    val resource = a.getOrElse("resource", "").asInstanceOf[String]
+    val timestamp = a.getOrElse("timestamp", 0).asInstanceOf[Long]
+    val eventVersion = a.getOrElse("eventVersion", 0).asInstanceOf[Short]
+    val details = a.getOrElse("details", Map()).asInstanceOf[Map[String, String]]
+    ResourceEvent(id,userId,clientId,resource,timestamp,eventVersion,details).asInstanceOf[A]
   }
 }