2 * Copyright 2011 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.store.mongodb
38 import gr.grnet.aquarium.util.Loggable
39 import com.mongodb.util.JSON
40 import gr.grnet.aquarium.user.UserState
41 import gr.grnet.aquarium.util.displayableObjectInfo
42 import gr.grnet.aquarium.util.json.JsonSupport
43 import collection.mutable.ListBuffer
44 import gr.grnet.aquarium.store._
45 import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, AquariumEvent}
46 import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames
49 import com.ckkloverdos.maybe.{Failed, Just, Maybe}
52 * Mongodb implementation of the various aquarium stores.
54 * @author Christos KK Loverdos <loverdos@gmail.com>
55 * @author Georgios Gousios <gousiosg@gmail.com>
62 extends ResourceEventStore with UserStateStore with WalletEntryStore with Loggable {
64 private[store] lazy val rcevents: DBCollection = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
65 private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
66 private[store] lazy val imevents: DBCollection = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
67 private[store] lazy val wallets: DBCollection = getCollection(MongoDBStore.IM_WALLETS)
69 private[this] def getCollection(name: String): DBCollection = {
70 val db = mongo.getDB(database)
71 if(!db.authenticate(username, password.toCharArray)) {
72 throw new StoreException("Could not authenticate user %s".format(username))
74 db.getCollection(name)
77 /* TODO: Some of the following methods rely on JSON (de-)serialization).
78 * A method based on proper object serialization would be much faster.
81 private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
82 if (one.occurredMillis > two.occurredMillis) false
83 else if (one.occurredMillis < two.occurredMillis) true
87 private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
88 if (one.occurredMillis < two.occurredMillis) false
89 else if (one.occurredMillis > two.occurredMillis) true
94 def storeResourceEvent(event: ResourceEvent): Maybe[RecordID] =
95 MongoDBStore.storeAquariumEvent(event, rcevents)
97 def findResourceEventById(id: String): Maybe[ResourceEvent] =
98 MongoDBStore.findById(id, rcevents, MongoDBStore.dbObjectToResourceEvent)
100 def findResourceEventsByUserId(userId: String)
101 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
102 val query = new BasicDBObject(JsonNames.userId, userId)
104 MongoDBStore.runQuery(query, rcevents)(MongoDBStore.dbObjectToResourceEvent)(sortWith)
107 def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
108 val query = new BasicDBObject()
109 query.put(JsonNames.userId, userId)
110 query.put(JsonNames.timestamp, new BasicDBObject("$gte", timestamp))
112 val sort = new BasicDBObject(JsonNames.timestamp, 1)
114 val cursor = rcevents.find(query).sort(sort)
117 val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
118 while(cursor.hasNext) {
119 buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
126 //-ResourceEventStore
129 def storeUserState(userState: UserState): Maybe[RecordID] =
130 MongoDBStore.storeUserState(userState, users)
132 def findUserStateByUserId(userId: String): Maybe[UserState] = {
134 val query = new BasicDBObject(JsonNames.userId, userId)
135 val cursor = rcevents find query
139 MongoDBStore.dbObjectToUserState(cursor.next())
150 def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] =
151 MongoDBStore.storeAquariumEvent(entry, wallets)
153 def findWalletEntryById(id: String): Maybe[WalletEntry] =
154 MongoDBStore.findById[WalletEntry](id, wallets, MongoDBStore.dbObjectToWalletEntry)
156 def findUserWalletEntries(userId: String) = {
158 findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
161 def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
162 val q = new BasicDBObject()
163 // TODO: Is this the correct way for an AND query?
164 q.put(JsonNames.timestamp, new BasicDBObject("$gt", from.getTime))
165 q.put(JsonNames.timestamp, new BasicDBObject("$lt", to.getTime))
166 q.put(JsonNames.userId, userId)
168 MongoDBStore.runQuery[WalletEntry](q, wallets)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
173 object MongoDBStore {
174 def RESOURCE_EVENTS_COLLECTION = "resevents"
175 def USERS_COLLECTION = "users"
176 def IM_EVENTS_COLLECTION = "imevents"
177 def IM_WALLETS = "wallets"
179 def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
180 ResourceEvent.fromJson(JSON.serialize(dbObject))
183 def dbObjectToUserState(dbObj: DBObject): UserState = {
184 UserState.fromJson(JSON.serialize(dbObj))
187 def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
188 WalletEntry.fromJson(JSON.serialize(dbObj))
191 def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe {
192 val query = new BasicDBObject(JsonNames.id, id)
193 val cursor = collection find query
197 deserializer apply cursor.next
199 null: A // will be transformed to NoVal by the Maybe polymorphic constructor
205 def runQuery[A <: AquariumEvent](query: BasicDBObject, collection: DBCollection)
206 (deserializer: (DBObject) => A)
207 (sortWith: Option[(A, A) => Boolean]): List[A] = {
208 val cur = collection find query
213 val buff = new ListBuffer[A]()
216 buff += deserializer apply cur.next
222 case Some(sorter) => buff.toList.sortWith(sorter)
223 case None => buff.toList
228 def storeAquariumEvent[A <: AquariumEvent](event: A, collection: DBCollection) : Maybe[RecordID] = {
229 storeAny[A](event, collection, JsonNames.id, (e) => e.id, MongoDBStore.jsonSupportToDBObject)
232 def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
233 storeAny[UserState](userState, collection, JsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject)
236 def storeAny[A](any: A,
237 collection: DBCollection,
239 idValueProvider: (A) => String,
240 serializer: (A) => DBObject) : Maybe[RecordID] = Maybe {
242 val dbObj = serializer apply any
243 val writeResult = collection insert dbObj
244 writeResult.getLastError().throwOnError()
246 // Get back to retrieve unique id
247 val cursor = collection.find(new BasicDBObject(idName, idValueProvider(any)))
250 // TODO: better way to get _id?
252 RecordID(cursor.next().get(JsonNames._id).toString)
254 throw new StoreException("Could not store %s to %s".format(any, collection))
260 def jsonSupportToDBObject(any: JsonSupport): DBObject = {
261 JSON.parse(any.toJson) match {
262 case dbObject: DBObject ⇒
265 throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))