2 * Copyright 2011 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.store.mongodb
38 import gr.grnet.aquarium.util.Loggable
39 import com.mongodb.util.JSON
40 import gr.grnet.aquarium.user.UserState
41 import gr.grnet.aquarium.util.displayableObjectInfo
42 import gr.grnet.aquarium.util.json.JsonSupport
43 import collection.mutable.ListBuffer
44 import gr.grnet.aquarium.store._
45 import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, AquariumEvent}
46 import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames
48 import com.ckkloverdos.maybe.Maybe
52 * Mongodb implementation of the various aquarium stores.
54 * @author Christos KK Loverdos <loverdos@gmail.com>
55 * @author Georgios Gousios <gousiosg@gmail.com>
62 extends ResourceEventStore with UserStateStore with WalletEntryStore with Loggable {
64 private[store] lazy val rcevents: DBCollection = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
65 private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
66 private[store] lazy val imevents: DBCollection = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
67 private[store] lazy val wallets: DBCollection = getCollection(MongoDBStore.IM_WALLETS)
69 private[this] def getCollection(name: String): DBCollection = {
70 val db = mongo.getDB(database)
71 if(!db.authenticate(username, password.toCharArray)) {
72 throw new StoreException("Could not authenticate user %s".format(username))
74 db.getCollection(name)
77 /* TODO: Some of the following methods rely on JSON (de-)serialization).
78 * A method based on proper object serialization would be much faster.
81 private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
82 if (one.occurredMillis > two.occurredMillis) false
83 else if (one.occurredMillis < two.occurredMillis) true
87 private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
88 if (one.occurredMillis < two.occurredMillis) false
89 else if (one.occurredMillis > two.occurredMillis) true
94 def storeResourceEvent(event: ResourceEvent): Maybe[RecordID] =
95 MongoDBStore.storeAquariumEvent(event, rcevents)
97 def findResourceEventById(id: String): Maybe[ResourceEvent] =
98 MongoDBStore.findById(id, rcevents, MongoDBStore.dbObjectToResourceEvent)
100 def findResourceEventsByUserId(userId: String)
101 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
102 val query = new BasicDBObject(JsonNames.userId, userId)
104 MongoDBStore.runQuery(query, rcevents)(MongoDBStore.dbObjectToResourceEvent)(sortWith)
107 def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
108 val query = new BasicDBObject()
109 query.put(JsonNames.userId, userId)
110 query.put(JsonNames.timestamp, new BasicDBObject("$gte", timestamp))
112 val sort = new BasicDBObject(JsonNames.timestamp, 1)
114 val cursor = rcevents.find(query).sort(sort)
117 val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
118 while(cursor.hasNext) {
119 buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
126 //-ResourceEventStore
129 def storeUserState(userState: UserState): Maybe[RecordID] =
130 MongoDBStore.storeUserState(userState, users)
132 def findUserStateByUserId(userId: String): Maybe[UserState] = {
134 val query = new BasicDBObject(JsonNames.userId, userId)
135 val cursor = rcevents find query
139 MongoDBStore.dbObjectToUserState(cursor.next())
150 def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] =
151 MongoDBStore.storeAquariumEvent(entry, wallets)
153 def findWalletEntryById(id: String): Maybe[WalletEntry] =
154 MongoDBStore.findById[WalletEntry](id, wallets, MongoDBStore.dbObjectToWalletEntry)
156 def findUserWalletEntries(userId: String) = {
158 findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
161 def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
162 val q = new BasicDBObject()
163 // TODO: Is this the correct way for an AND query?
164 q.put(JsonNames.timestamp, new BasicDBObject("$gt", from.getTime))
165 q.put(JsonNames.timestamp, new BasicDBObject("$lt", to.getTime))
166 q.put(JsonNames.userId, userId)
168 MongoDBStore.runQuery[WalletEntry](q, wallets)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
173 def findLatestUserWalletEntries(userId: String) = {
175 val orderBy = new BasicDBObject(JsonNames.occurredMillis, -1) // -1 is descending order
176 val cursor = wallets.find().sort(orderBy)
179 val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
181 val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
182 buffer += walletEntry
184 var _previousOccurredMillis = walletEntry.occurredMillis
187 while(cursor.hasNext && _ok) {
188 val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
189 var currentOccurredMillis = walletEntry.occurredMillis
190 _ok = currentOccurredMillis == _previousOccurredMillis
193 buffer += walletEntry
208 object MongoDBStore {
209 final val RESOURCE_EVENTS_COLLECTION = "resevents"
210 final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents"
211 final val USERS_COLLECTION = "users"
212 final val IM_EVENTS_COLLECTION = "imevents"
213 final val IM_WALLETS = "wallets"
215 def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
216 ResourceEvent.fromJson(JSON.serialize(dbObject))
219 def dbObjectToUserState(dbObj: DBObject): UserState = {
220 UserState.fromJson(JSON.serialize(dbObj))
223 def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
224 WalletEntry.fromJson(JSON.serialize(dbObj))
227 def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe {
228 val query = new BasicDBObject(JsonNames.id, id)
229 val cursor = collection find query
233 deserializer apply cursor.next
235 null: A // will be transformed to NoVal by the Maybe polymorphic constructor
241 def runQuery[A <: AquariumEvent](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
242 (deserializer: (DBObject) => A)
243 (sortWith: Option[(A, A) => Boolean]): List[A] = {
244 val cursor0 = collection find query
245 val cursor = if(orderBy ne null) {
249 } // I really know that docs say that it is the same cursor.
251 if(!cursor.hasNext) {
255 val buff = new ListBuffer[A]()
257 while(cursor.hasNext) {
258 buff += deserializer apply cursor.next
264 case Some(sorter) => buff.toList.sortWith(sorter)
265 case None => buff.toList
270 def storeAquariumEvent[A <: AquariumEvent](event: A, collection: DBCollection) : Maybe[RecordID] = {
271 storeAny[A](event, collection, JsonNames.id, (e) => e.id, MongoDBStore.jsonSupportToDBObject)
274 def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
275 storeAny[UserState](userState, collection, JsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject)
278 def storeAny[A](any: A,
279 collection: DBCollection,
281 idValueProvider: (A) => String,
282 serializer: (A) => DBObject) : Maybe[RecordID] = Maybe {
284 val dbObj = serializer apply any
285 val writeResult = collection insert dbObj
286 writeResult.getLastError().throwOnError()
288 // Get back to retrieve unique id
289 val cursor = collection.find(new BasicDBObject(idName, idValueProvider(any)))
292 // TODO: better way to get _id?
294 RecordID(cursor.next().get(JsonNames._id).toString)
296 throw new StoreException("Could not store %s to %s".format(any, collection))
302 def jsonSupportToDBObject(any: JsonSupport): DBObject = {
303 JSON.parse(any.toJson) match {
304 case dbObject: DBObject ⇒
307 throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))