2 * Copyright 2011 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.store.mongodb
38 import gr.grnet.aquarium.util.Loggable
39 import com.ckkloverdos.maybe.{Failed, Just, Maybe}
40 import com.mongodb.util.JSON
41 import gr.grnet.aquarium.user.UserState
42 import gr.grnet.aquarium.user.UserState.JsonNames
43 import gr.grnet.aquarium.util.displayableObjectInfo
44 import gr.grnet.aquarium.util.json.JsonSupport
45 import collection.mutable.{ListBuffer}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.logic.events.{WalletEntry, UserEvent, ResourceEvent, AquariumEvent}
52 * Mongodb implementation of the event _store (and soon the user _store).
54 * @author Christos KK Loverdos <loverdos@gmail.com>
55 * @author Georgios Gousios <gousiosg@gmail.com>
62 extends EventStore with UserStore
63 with IMStore with WalletStore with Loggable {
65 private[store] lazy val events: DBCollection = getCollection(MongoDBStore.EVENTS_COLLECTION)
66 private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
67 private[store] lazy val imevents: DBCollection = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
68 private[store] lazy val wallets: DBCollection = getCollection(MongoDBStore.IM_WALLETS)
70 private[this] def getCollection(name: String): DBCollection = {
71 val db = mongo.getDB(database)
72 if(!db.authenticate(username, password.toCharArray)) {
73 throw new StoreException("Could not authenticate user %s".format(username))
75 db.getCollection(name)
78 /* TODO: Some of the following methods rely on JSON (de-)serialization).
79 * A method based on proper object serialization would be much faster.
82 private[this] def _deserializeEvent[A <: AquariumEvent](a: DBObject): A = {
83 //TODO: Distinguish events and deserialize appropriately
84 ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
87 private[this] def _deserializeUserState(dbObj: DBObject): UserState = {
88 val jsonString = JSON.serialize(dbObj)
89 UserState.fromJson(jsonString)
92 private[this] def _makeDBObject(any: JsonSupport): DBObject = {
93 JSON.parse(any.toJson) match {
94 case dbObject: DBObject ⇒
97 throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))
101 private[this] def _prepareFieldQuery(name: String, value: String): DBObject = {
102 val dbObj = new BasicDBObject(1)
103 dbObj.put(name, value)
107 private[this] def _insertObject(collection: DBCollection, obj: JsonSupport): DBObject = {
108 val dbObj = _makeDBObject(obj)
109 collection insert dbObj
113 private[this] def _checkWasInserted(collection: DBCollection, obj: JsonSupport, idName: String, id: String): String = {
114 val cursor = collection.find(_prepareFieldQuery(idName, id))
115 if (!cursor.hasNext) {
116 val errMsg = "Failed to _store %s".format(displayableObjectInfo(obj))
118 throw new StoreException(errMsg)
121 val retval = cursor.next.get("_id").toString
126 private[this] def _store[A <: AquariumEvent](entry: A, col: DBCollection) : Maybe[RecordID] = {
129 val dbObj = _makeDBObject(entry)
132 // Get back to retrieve unique id
133 val cursor = col.find(_prepareFieldQuery("id", entry.id))
135 if (!cursor.hasNext) {
136 logger.error("Failed to _store entry: %s".format(entry))
137 return Failed(new StoreException("Failed to _store entry: %s".format(entry)))
140 Just(RecordID(cursor.next.get("_id").toString))
142 case m: MongoException =>
143 logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
147 private[this] def _findById[A <: AquariumEvent](id: String, col: DBCollection) : Option[A] = {
148 val q = new BasicDBObject()
151 val cur = col.find(q)
154 Some(_deserializeEvent(cur.next))
159 private[this] def _query[A <: AquariumEvent](q: BasicDBObject,
161 (sortWith: Option[(A, A) => Boolean]): List[A] = {
162 val cur = col.find(q)
166 val buff = new ListBuffer[A]()
169 buff += _deserializeEvent(cur.next)
172 case Some(sorter) => buff.toList.sortWith(sorter)
173 case None => buff.toList
177 private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
178 if (one.timestamp > two.timestamp) false
179 else if (one.timestamp < two.timestamp) true
183 private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
184 if (one.timestamp < two.timestamp) false
185 else if (one.timestamp > two.timestamp) true
190 def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = _store(event, events)
192 def findEventById[A <: AquariumEvent](id: String): Option[A] = _findById[A](id, events)
194 def findEventsByUserId[A <: AquariumEvent](userId: Long)
195 (sortWith: Option[(A, A) => Boolean]): List[A] = {
196 val q = new BasicDBObject()
197 q.put("userId", userId)
199 _query(q, events)(sortWith)
204 def storeUserState(userState: UserState): Maybe[RecordID] = {
206 val dbObj = _insertObject(users, userState)
207 val id = _checkWasInserted(users, userState, JsonNames.userId, userState.userId)
212 def findUserStateByUserId(userId: String): Maybe[UserState] = {
214 val queryObj = _prepareFieldQuery(JsonNames.userId, userId)
215 val cursor = events.find(queryObj)
217 if(!cursor.hasNext) {
221 val userState = _deserializeUserState(cursor.next())
230 def storeUserEvent(event: UserEvent): Maybe[RecordID] = _store(event, imevents)
232 def findUserEventById[A <: UserEvent](id: String): Option[A] = _findById(id, imevents)
234 def findUserEventsByUserId[A <: UserEvent](userId: String)(sortWith: Option[(A, A) => Boolean]): List[A] = {
235 val q = new BasicDBObject()
236 q.put("userId", userId)
238 _query(q, imevents)(sortWith)
241 def findLastUserEvent(userId: String) =
242 findUserEventsByUserId(userId)(Some(_sortByTimestampDesc)).headOption
244 def userExists(userId: String) = false //TODO: Write me
248 def store(entry: WalletEntry): Maybe[RecordID] = _store(entry, wallets)
250 def findEntryById(id: String): Option[WalletEntry] = _findById[WalletEntry](id, wallets)
252 def findAllUserEntries(userId: String) = findUserEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
254 def findUserEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
255 val q = new BasicDBObject()
256 q.put("timestamp", new BasicDBObject("$gt", from.getTime))
257 q.put("timestamp", new BasicDBObject("$lt", to.getTime))
258 q.put("userId", userId)
260 _query[WalletEntry](q, wallets)(Some(_sortByTimestampAsc))
265 object MongoDBStore {
266 def EVENTS_COLLECTION = "events"
267 def USERS_COLLECTION = "users"
268 def IM_EVENTS_COLLECTION = "imevents"
269 def IM_WALLETS = "wallets"