root / logic / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala @ 492a2d49
History | View | Annotate | Download (4.8 kB)
1 |
/* |
---|---|
2 |
* Copyright 2011 GRNET S.A. All rights reserved. |
3 |
* |
4 |
* Redistribution and use in source and binary forms, with or |
5 |
* without modification, are permitted provided that the following |
6 |
* conditions are met: |
7 |
* |
8 |
* 1. Redistributions of source code must retain the above |
9 |
* copyright notice, this list of conditions and the following |
10 |
* disclaimer. |
11 |
* |
12 |
* 2. Redistributions in binary form must reproduce the above |
13 |
* copyright notice, this list of conditions and the following |
14 |
* disclaimer in the documentation and/or other materials |
15 |
* provided with the distribution. |
16 |
* |
17 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
18 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
19 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
20 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
21 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
22 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
23 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
24 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
25 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
26 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
27 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 |
* POSSIBILITY OF SUCH DAMAGE. |
29 |
* |
30 |
* The views and conclusions contained in the software and |
31 |
* documentation are those of the authors and should not be |
32 |
* interpreted as representing official policies, either expressed |
33 |
* or implied, of GRNET S.A. |
34 |
*/ |
35 |
|
36 |
package gr.grnet.aquarium.store.mongodb |
37 |
|
38 |
import gr.grnet.aquarium.util.Loggable |
39 |
import gr.grnet.aquarium.store.{RecordID, StoreException, EventStore} |
40 |
import com.mongodb._ |
41 |
import util.{JSONParseException, JSON} |
42 |
import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe} |
43 |
import gr.grnet.aquarium.logic.events.{ResourceEvent, AquariumEvent} |
44 |
|
45 |
/** |
46 |
* Mongodb implementation of the message store. |
47 |
* |
48 |
* @author Christos KK Loverdos <loverdos@gmail.com> |
49 |
* @author Georgios Gousios <gousiosg@gmail.com> |
50 |
*/ |
51 |
class MongoDBStore(host: String, port: String, |
52 |
username: String, passwd: String, |
53 |
database: String) |
54 |
extends EventStore with Loggable { |
55 |
|
56 |
private object Connection { |
57 |
lazy val mongo: Option[Mongo] = { |
58 |
try { |
59 |
val addr = new ServerAddress(host, port.toInt) |
60 |
val opt = new MongoOptions() |
61 |
Some(new Mongo(addr, opt)) |
62 |
} catch { |
63 |
case e: MongoException => |
64 |
logger.error(("Cannot connect to mongo at %s:%s (uname=%s). " + |
65 |
"Cause:").format(host,port,username, e)) |
66 |
None |
67 |
case nfe: NumberFormatException => |
68 |
logger.error("%s is not a valid port number".format(port)) |
69 |
None |
70 |
} |
71 |
} |
72 |
} |
73 |
|
74 |
private[store] lazy val events: DBCollection = getCollection("events") |
75 |
|
76 |
private[store] lazy val users: DBCollection = getCollection("user") |
77 |
|
78 |
private[store]def getCollection(name: String): DBCollection = { |
79 |
Connection.mongo match { |
80 |
case Some(x) => |
81 |
val db = x.getDB(database) |
82 |
if(!db.authenticate(username, passwd.toCharArray)) |
83 |
throw new StoreException("Could not authenticate user %s".format(username)) |
84 |
db.getCollection(name) |
85 |
case None => throw new StoreException("No connection to Mongo") |
86 |
} |
87 |
} |
88 |
|
89 |
/* TODO: Some of the following methods rely on JSON (de-)serialization). |
90 |
* A method based on proper object serialization would be much faster. |
91 |
*/ |
92 |
|
93 |
//EventStore methods |
94 |
def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = { |
95 |
try { |
96 |
// Store |
97 |
val obj = JSON.parse(event.toJson).asInstanceOf[DBObject] |
98 |
events.insert(obj) |
99 |
|
100 |
// TODO: Make this retrieval a configurable option |
101 |
// Get back to retrieve unique id |
102 |
val q = new BasicDBObject() |
103 |
q.put("id", event.id) |
104 |
|
105 |
val cur = events.find(q) |
106 |
|
107 |
if (!cur.hasNext) { |
108 |
logger.error("Failed to store event: %s".format(event)) |
109 |
Failed(new StoreException("Failed to store event: %s".format(event))) |
110 |
} |
111 |
|
112 |
Just(RecordID(cur.next.get("id").toString)) |
113 |
} catch { |
114 |
case j: JSONParseException => |
115 |
logger.error("Error parsing JSON for event %s %s".format(event,j)); Failed(j) |
116 |
case m: MongoException => |
117 |
logger.error("Unknown Mongo error: %s".format(m)); Failed(m) |
118 |
} |
119 |
} |
120 |
|
121 |
def findEventById[A <: AquariumEvent](id: String): Option[A] = { |
122 |
val q = new BasicDBObject() |
123 |
q.put("id", id) |
124 |
|
125 |
val cur = events.find(q) |
126 |
|
127 |
if (cur.hasNext) |
128 |
deserialize(cur.next) |
129 |
else |
130 |
None |
131 |
} |
132 |
|
133 |
def findEventsByUserId[A <: AquariumEvent](userId: Long) |
134 |
(sortWith: Option[(A, A) => Boolean]): List[A] = { |
135 |
List() |
136 |
} |
137 |
|
138 |
private def deserialize[A <: AquariumEvent](a: DBObject): A = { |
139 |
//TODO: Distinguish events and deserialize appropriately |
140 |
ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A] |
141 |
} |
142 |
} |