2 * Copyright 2011 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.store.mongodb
38 import gr.grnet.aquarium.util.Loggable
39 import com.ckkloverdos.maybe.{Failed, Just, Maybe}
40 import com.mongodb.util.JSON
41 import gr.grnet.aquarium.user.UserState
42 import gr.grnet.aquarium.user.UserState.JsonNames
43 import gr.grnet.aquarium.util.displayableObjectInfo
44 import gr.grnet.aquarium.util.json.JsonSupport
45 import collection.mutable.{ListBuffer}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.logic.events.{WalletEntry, UserEvent, ResourceEvent, AquariumEvent}
52 * Mongodb implementation of the event _store (and soon the user _store).
54 * @author Christos KK Loverdos <loverdos@gmail.com>
55 * @author Georgios Gousios <gousiosg@gmail.com>
62 extends EventStore with UserStore with WalletStore with Loggable {
64 private[store] lazy val events: DBCollection = getCollection(MongoDBStore.EVENTS_COLLECTION)
65 private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
66 private[store] lazy val imevents: DBCollection = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
67 private[store] lazy val wallets: DBCollection = getCollection(MongoDBStore.IM_WALLETS)
69 private[this] def getCollection(name: String): DBCollection = {
70 val db = mongo.getDB(database)
71 if(!db.authenticate(username, password.toCharArray)) {
72 throw new StoreException("Could not authenticate user %s".format(username))
74 db.getCollection(name)
77 /* TODO: Some of the following methods rely on JSON (de-)serialization).
78 * A method based on proper object serialization would be much faster.
81 private[this] def _deserializeEvent[A <: AquariumEvent](a: DBObject): A = {
82 //TODO: Distinguish events and deserialize appropriately
83 ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
86 private[this] def _deserializeUserState(dbObj: DBObject): UserState = {
87 val jsonString = JSON.serialize(dbObj)
88 UserState.fromJson(jsonString)
91 private[this] def _makeDBObject(any: JsonSupport): DBObject = {
92 JSON.parse(any.toJson) match {
93 case dbObject: DBObject ⇒
96 throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))
100 private[this] def _prepareFieldQuery(name: String, value: String): DBObject = {
101 val dbObj = new BasicDBObject(1)
102 dbObj.put(name, value)
106 private[this] def _insertObject(collection: DBCollection, obj: JsonSupport): DBObject = {
107 val dbObj = _makeDBObject(obj)
108 collection insert dbObj
112 private[this] def _checkWasInserted(collection: DBCollection, obj: JsonSupport, idName: String, id: String): String = {
113 val cursor = collection.find(_prepareFieldQuery(idName, id))
114 if (!cursor.hasNext) {
115 val errMsg = "Failed to _store %s".format(displayableObjectInfo(obj))
117 throw new StoreException(errMsg)
120 val retval = cursor.next.get("_id").toString
125 private[this] def _store[A <: AquariumEvent](entry: A, col: DBCollection) : Maybe[RecordID] = {
128 val dbObj = _makeDBObject(entry)
131 // Get back to retrieve unique id
132 val cursor = col.find(_prepareFieldQuery("id", entry.id))
134 if (!cursor.hasNext) {
136 logger.error("Failed to _store entry: %s".format(entry))
137 return Failed(new StoreException("Failed to _store entry: %s".format(entry)))
140 val retval = Just(RecordID(cursor.next.get("_id").toString))
144 case m: MongoException =>
145 logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
149 private[this] def _findById[A <: AquariumEvent](id: String, col: DBCollection) : Option[A] = {
150 val q = new BasicDBObject()
153 val cur = col.find(q)
155 val retval = if (cur.hasNext)
156 Some(_deserializeEvent(cur.next))
164 private[this] def _query[A <: AquariumEvent](q: BasicDBObject,
166 (sortWith: Option[(A, A) => Boolean]): List[A] = {
167 val cur = col.find(q)
173 val buff = new ListBuffer[A]()
176 buff += _deserializeEvent(cur.next)
181 case Some(sorter) => buff.toList.sortWith(sorter)
182 case None => buff.toList
186 private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
187 if (one.timestamp > two.timestamp) false
188 else if (one.timestamp < two.timestamp) true
192 private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
193 if (one.timestamp < two.timestamp) false
194 else if (one.timestamp > two.timestamp) true
199 def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = _store(event, events)
201 def findEventById[A <: AquariumEvent](id: String): Option[A] = _findById[A](id, events)
203 def findEventsByUserId[A <: AquariumEvent](userId: String)
204 (sortWith: Option[(A, A) => Boolean]): List[A] = {
205 val q = new BasicDBObject()
206 q.put("userId", userId)
208 _query(q, events)(sortWith)
211 def findEventsByUserIdAfterTimestamp[A <: AquariumEvent](userId: String, timestamp: Long): List[A] = {
212 val query = new BasicDBObject()
213 query.put("userId", userId)
214 query.put("timestamp", "{\"$gte\": %s}".format(timestamp))
216 val sort = new BasicDBObject()
217 sort.put("timestamp", 1)
219 val cursor = events.find(query).sort(sort)
220 val buffer = new scala.collection.mutable.ListBuffer[A]
221 while(cursor.hasNext) {
222 buffer += _deserializeEvent(cursor.next())
233 def storeUserState(userState: UserState): Maybe[RecordID] = {
235 val dbObj = _insertObject(users, userState)
236 val id = _checkWasInserted(users, userState, JsonNames.userId, userState.userId)
241 def findUserStateByUserId(userId: String): Maybe[UserState] = {
243 val queryObj = _prepareFieldQuery(JsonNames.userId, userId)
244 val cursor = events.find(queryObj)
246 if(!cursor.hasNext) {
250 val userState = _deserializeUserState(cursor.next())
259 def store(entry: WalletEntry): Maybe[RecordID] = _store(entry, wallets)
261 def findEntryById(id: String): Option[WalletEntry] = _findById[WalletEntry](id, wallets)
263 def findAllUserEntries(userId: String) = findUserEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
265 def findUserEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
266 val q = new BasicDBObject()
267 q.put("timestamp", new BasicDBObject("$gt", from.getTime))
268 q.put("timestamp", new BasicDBObject("$lt", to.getTime))
269 q.put("userId", userId)
271 _query[WalletEntry](q, wallets)(Some(_sortByTimestampAsc))
276 object MongoDBStore {
277 def EVENTS_COLLECTION = "events"
278 def USERS_COLLECTION = "users"
279 def IM_EVENTS_COLLECTION = "imevents"
280 def IM_WALLETS = "wallets"