36 |
36 |
package gr.grnet.aquarium.store.mongodb
|
37 |
37 |
|
38 |
38 |
import gr.grnet.aquarium.util.Loggable
|
|
39 |
import gr.grnet.aquarium.store.{RecordID, StoreException, EventStore}
|
39 |
40 |
import com.mongodb._
|
40 |
|
import gr.grnet.aquarium.store.{MessageStoreException, MessageStore}
|
|
41 |
import util.{JSONParseException, JSON}
|
|
42 |
import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
|
|
43 |
import gr.grnet.aquarium.logic.events.{ResourceEvent, AquariumEvent}
|
41 |
44 |
|
42 |
45 |
/**
|
43 |
46 |
* Mongodb implementation of the message store.
|
... | ... | |
46 |
49 |
* @author Georgios Gousios <gousiosg@gmail.com>
|
47 |
50 |
*/
|
48 |
51 |
class MongoDBStore(host: String, port: String,
|
49 |
|
username: String, passwd: String)
|
50 |
|
extends MessageStore with Loggable {
|
|
52 |
username: String, passwd: String,
|
|
53 |
database: String)
|
|
54 |
extends EventStore with Loggable {
|
51 |
55 |
|
52 |
|
private[mongo] object Connection {
|
|
56 |
private object Connection {
|
53 |
57 |
lazy val mongo: Option[Mongo] = {
|
54 |
58 |
try {
|
55 |
59 |
val addr = new ServerAddress(host, port.toInt)
|
... | ... | |
67 |
71 |
}
|
68 |
72 |
}
|
69 |
73 |
|
70 |
|
private[store] lazy val events: DB = {
|
|
74 |
private[store] lazy val events: DBCollection = getCollection("events")
|
|
75 |
|
|
76 |
private[store] lazy val users: DBCollection = getCollection("user")
|
|
77 |
|
|
78 |
private[store]def getCollection(name: String): DBCollection = {
|
71 |
79 |
Connection.mongo match {
|
72 |
|
case Some(x) => x.getDB("events")
|
73 |
|
case None => throw new MessageStoreException("No connection to Mongo")
|
|
80 |
case Some(x) =>
|
|
81 |
val db = x.getDB(database)
|
|
82 |
if(!db.authenticate(username, passwd.toCharArray))
|
|
83 |
throw new StoreException("Could not authenticate user %s".format(username))
|
|
84 |
db.getCollection(name)
|
|
85 |
case None => throw new StoreException("No connection to Mongo")
|
74 |
86 |
}
|
75 |
87 |
}
|
76 |
88 |
|
77 |
|
private[store] lazy val users: DB = {
|
78 |
|
Connection.mongo match {
|
79 |
|
case Some(x) => x.getDB("users")
|
80 |
|
case None => throw new MessageStoreException("No connection to Mongo")
|
|
89 |
/* TODO: Some of the following methods rely on JSON (de-)serialization).
|
|
90 |
* A method based on proper object serialization would be much faster.
|
|
91 |
*/
|
|
92 |
|
|
93 |
//EventStore methods
|
|
94 |
def store[A <: AquariumEvent](event: A): Maybe[RecordID] = {
|
|
95 |
try {
|
|
96 |
// Store
|
|
97 |
val obj = JSON.parse(event.toJson).asInstanceOf[DBObject]
|
|
98 |
events.insert(obj)
|
|
99 |
|
|
100 |
// TODO: Make this retrieval a configurable option
|
|
101 |
// Get back to retrieve unique id
|
|
102 |
val q = new BasicDBObject()
|
|
103 |
q.put("id", event.id)
|
|
104 |
|
|
105 |
val cur = events.find(q)
|
|
106 |
|
|
107 |
if (!cur.hasNext) {
|
|
108 |
logger.error("Failed to store event: %s".format(event))
|
|
109 |
Failed(new StoreException("Failed to store event: %s".format(event)))
|
|
110 |
}
|
|
111 |
|
|
112 |
Just(RecordID(cur.next.get("id").toString))
|
|
113 |
} catch {
|
|
114 |
case j: JSONParseException =>
|
|
115 |
logger.error("Error parsing JSON for event %s %s".format(event,j)); Failed(j)
|
|
116 |
case m: MongoException =>
|
|
117 |
logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
|
81 |
118 |
}
|
82 |
119 |
}
|
83 |
120 |
|
84 |
|
def storeString(message: String) = {
|
|
121 |
def findById[A <: AquariumEvent](id: String): Option[A] = {
|
|
122 |
val q = new BasicDBObject()
|
|
123 |
q.put("id", id)
|
|
124 |
|
|
125 |
val cur = events.find(q)
|
|
126 |
|
|
127 |
if (cur.hasNext)
|
|
128 |
deserialize(cur.next)
|
|
129 |
else
|
|
130 |
None
|
|
131 |
}
|
|
132 |
|
|
133 |
def findByUserId[A <: AquariumEvent](userId: String)
|
|
134 |
(sortWith: Option[(A, A) => Boolean]): List[A] = {
|
|
135 |
List()
|
|
136 |
}
|
85 |
137 |
|
|
138 |
private def deserialize[A <: AquariumEvent](a: DBObject): A = {
|
|
139 |
//TODO: Distinguish events and deserialize appropriately
|
|
140 |
ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
|
86 |
141 |
}
|
87 |
142 |
}
|