Revision 85aa28b3

b/logic/src/main/scala/gr/grnet/aquarium/store/Store.scala
66 66
    MasterConf.MasterConf.get(MasterConf.Keys.persistence_port)
67 67
  }
68 68

  
69
  def getConnection(): Option[MessageStore] = {
69
  private lazy val db = {
70
    MasterConf.MasterConf.get(MasterConf.Keys.persistence_db)
71
  }
72

  
73
  def getEventStore(): Option[EventStore] = {
70 74
    provider match {
71 75
      case "mongodb" =>
72
        Some(new MongoDBStore(host, port, uname, passwd))
76
        Some(new MongoDBStore(host, port, uname, passwd, db))
73 77
      case _ => 
74 78
        logger.error("Provider <%s> not supported".format(provider))
75 79
        None
b/logic/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala
36 36
package gr.grnet.aquarium.store.mongodb
37 37

  
38 38
import gr.grnet.aquarium.util.Loggable
39
import gr.grnet.aquarium.store.{RecordID, StoreException, EventStore}
39 40
import com.mongodb._
40
import gr.grnet.aquarium.store.{MessageStoreException, MessageStore}
41
import util.{JSONParseException, JSON}
42
import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
43
import gr.grnet.aquarium.logic.events.{ResourceEvent, AquariumEvent}
41 44

  
42 45
/**
43 46
 * Mongodb implementation of the message store.
......
46 49
 * @author Georgios Gousios <gousiosg@gmail.com>
47 50
 */
48 51
class MongoDBStore(host: String, port: String,
49
                   username: String, passwd: String)
50
  extends MessageStore with Loggable {
52
                   username: String, passwd: String,
53
                   database: String)
54
  extends EventStore with Loggable {
51 55

  
52
  private[mongo] object Connection {
56
  private object Connection {
53 57
    lazy val mongo: Option[Mongo] = {
54 58
      try {
55 59
        val addr = new ServerAddress(host, port.toInt)
......
67 71
    }
68 72
  }
69 73

  
70
  private[store] lazy val events: DB = {
74
  private[store] lazy val events: DBCollection = getCollection("events")
75

  
76
  private[store] lazy val users: DBCollection = getCollection("user")
77

  
78
  private[store]def getCollection(name: String): DBCollection = {
71 79
    Connection.mongo match {
72
      case Some(x) => x.getDB("events")
73
      case None => throw new MessageStoreException("No connection to Mongo")
80
      case Some(x) =>
81
        val db = x.getDB(database)
82
        if(!db.authenticate(username, passwd.toCharArray))
83
          throw new StoreException("Could not authenticate user %s".format(username))
84
        db.getCollection(name)
85
      case None => throw new StoreException("No connection to Mongo")
74 86
    }
75 87
  }
76 88

  
77
  private[store] lazy val users: DB = {
78
    Connection.mongo match {
79
      case Some(x) => x.getDB("users")
80
      case None => throw new MessageStoreException("No connection to Mongo")
89
  /* TODO: Some of the following methods rely on JSON (de-)serialization).
90
   * A method based on proper object serialization would be much faster.
91
   */
92

  
93
  //EventStore methods
94
  def store[A <: AquariumEvent](event: A): Maybe[RecordID] = {
95
    try {
96
      // Store
97
      val obj = JSON.parse(event.toJson).asInstanceOf[DBObject]
98
      events.insert(obj)
99

  
100
      // TODO: Make this retrieval a configurable option
101
      // Get back to retrieve unique id
102
      val q = new BasicDBObject()
103
      q.put("id", event.id)
104

  
105
      val cur = events.find(q)
106

  
107
      if (!cur.hasNext) {
108
        logger.error("Failed to store event: %s".format(event))
109
        Failed(new StoreException("Failed to store event: %s".format(event)))
110
      }
111

  
112
      Just(RecordID(cur.next.get("id").toString))
113
    } catch {
114
      case j: JSONParseException =>
115
        logger.error("Error parsing JSON for event %s %s".format(event,j)); Failed(j)
116
      case m: MongoException =>
117
        logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
81 118
    }
82 119
  }
83 120

  
84
  def storeString(message: String) = {
121
  def findById[A <: AquariumEvent](id: String): Option[A] = {
122
    val q = new BasicDBObject()
123
    q.put("id", id)
124

  
125
    val cur = events.find(q)
126

  
127
    if (cur.hasNext)
128
      deserialize(cur.next)
129
    else
130
      None
131
  }
132

  
133
  def findByUserId[A <: AquariumEvent](userId: String)
134
                                      (sortWith: Option[(A, A) => Boolean]): List[A] = {
135
    List()
136
  }
85 137

  
138
  private def deserialize[A <: AquariumEvent](a: DBObject): A = {
139
    //TODO: Distinguish events and deserialize appropriately
140
    ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
86 141
  }
87 142
}

Also available in: Unified diff