747c9ba77e3942fa9c56fad1409003fc86fd50b7
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import gr.grnet.aquarium.util.Loggable
39 import com.mongodb.util.JSON
40 import gr.grnet.aquarium.user.UserState
41 import gr.grnet.aquarium.util.displayableObjectInfo
42 import gr.grnet.aquarium.util.json.JsonSupport
43 import collection.mutable.ListBuffer
44 import gr.grnet.aquarium.store._
45 import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, AquariumEvent}
46 import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames
47 import java.util.Date
48 import com.mongodb._
49 import com.ckkloverdos.maybe.{Failed, Just, Maybe}
50
51 /**
52  * Mongodb implementation of the various aquarium stores.
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  * @author Georgios Gousios <gousiosg@gmail.com>
56  */
57 class MongoDBStore(
58     val mongo: Mongo,
59     val database: String,
60     val username: String,
61     val password: String)
62   extends ResourceEventStore with UserStateStore with WalletEntryStore with Loggable {
63
64   private[store] lazy val rcevents: DBCollection = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
65   private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
66   private[store] lazy val imevents: DBCollection = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
67   private[store] lazy val wallets: DBCollection = getCollection(MongoDBStore.IM_WALLETS)
68
69   private[this] def getCollection(name: String): DBCollection = {
70     val db = mongo.getDB(database)
71     if(!db.authenticate(username, password.toCharArray)) {
72       throw new StoreException("Could not authenticate user %s".format(username))
73     }
74     db.getCollection(name)
75   }
76
77   /* TODO: Some of the following methods rely on JSON (de-)serialization).
78   * A method based on proper object serialization would be much faster.
79   */
80
81   private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
82     if (one.occurredMillis > two.occurredMillis) false
83     else if (one.occurredMillis < two.occurredMillis) true
84     else true
85   }
86
87   private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
88     if (one.occurredMillis < two.occurredMillis) false
89     else if (one.occurredMillis > two.occurredMillis) true
90     else true
91   }
92
93   //+ResourceEventStore
94   def storeResourceEvent(event: ResourceEvent): Maybe[RecordID] =
95     MongoDBStore.storeAquariumEvent(event, rcevents)
96
97   def findResourceEventById(id: String): Maybe[ResourceEvent] =
98     MongoDBStore.findById(id, rcevents, MongoDBStore.dbObjectToResourceEvent)
99
100   def findResourceEventsByUserId(userId: String)
101                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
102     val query = new BasicDBObject(JsonNames.userId, userId)
103
104     MongoDBStore.runQuery(query, rcevents)(MongoDBStore.dbObjectToResourceEvent)(sortWith)
105   }
106
107   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
108     val query = new BasicDBObject()
109     query.put(JsonNames.userId, userId)
110     query.put(JsonNames.timestamp, new BasicDBObject("$gte", timestamp))
111     
112     val sort = new BasicDBObject(JsonNames.timestamp, 1)
113
114     val cursor = rcevents.find(query).sort(sort)
115
116     try {
117       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
118       while(cursor.hasNext) {
119         buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
120       }
121       buffer.toList
122     } finally {
123       cursor.close()
124     }
125   }
126   //-ResourceEventStore
127
128   //+UserStateStore
129   def storeUserState(userState: UserState): Maybe[RecordID] =
130     MongoDBStore.storeUserState(userState, users)
131
132   def findUserStateByUserId(userId: String): Maybe[UserState] = {
133     Maybe {
134       val query = new BasicDBObject(JsonNames.userId, userId)
135       val cursor = rcevents find query
136
137       try {
138         if(cursor.hasNext)
139           MongoDBStore.dbObjectToUserState(cursor.next())
140         else
141           null
142       } finally {
143         cursor.close()
144       }
145     }
146   }
147   //-UserStateStore
148
149   //+WalletEntryStore
150   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] =
151     MongoDBStore.storeAquariumEvent(entry, wallets)
152
153   def findWalletEntryById(id: String): Maybe[WalletEntry] =
154     MongoDBStore.findById[WalletEntry](id, wallets, MongoDBStore.dbObjectToWalletEntry)
155
156   def findUserWalletEntries(userId: String) = {
157     // TODO: optimize
158     findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
159   }
160
161   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
162     val q = new BasicDBObject()
163     // TODO: Is this the correct way for an AND query?
164     q.put(JsonNames.timestamp, new BasicDBObject("$gt", from.getTime))
165     q.put(JsonNames.timestamp, new BasicDBObject("$lt", to.getTime))
166     q.put(JsonNames.userId, userId)
167
168     MongoDBStore.runQuery[WalletEntry](q, wallets)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
169   }
170   //-WalletEntryStore
171 }
172
173 object MongoDBStore {
174   def RESOURCE_EVENTS_COLLECTION = "resevents"
175   def USERS_COLLECTION = "users"
176   def IM_EVENTS_COLLECTION = "imevents"
177   def IM_WALLETS = "wallets"
178
179   def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
180     ResourceEvent.fromJson(JSON.serialize(dbObject))
181   }
182
183   def dbObjectToUserState(dbObj: DBObject): UserState = {
184     UserState.fromJson(JSON.serialize(dbObj))
185   }
186
187   def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
188     WalletEntry.fromJson(JSON.serialize(dbObj))
189   }
190
191   def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe {
192     val query = new BasicDBObject(JsonNames.id, id)
193     val cursor = collection find query
194
195     try {
196       if(cursor.hasNext)
197         deserializer apply cursor.next
198       else
199         null: A // will be transformed to NoVal by the Maybe polymorphic constructor
200     } finally {
201       cursor.close()
202     }
203   }
204
205   def runQuery[A <: AquariumEvent](query: BasicDBObject, collection: DBCollection)
206                                   (deserializer: (DBObject) => A)
207                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
208     val cur = collection find query
209     if(!cur.hasNext) {
210       cur.close()
211       Nil
212     } else {
213       val buff = new ListBuffer[A]()
214
215       while(cur.hasNext) {
216         buff += deserializer apply cur.next
217       }
218
219       cur.close()
220
221       sortWith match {
222         case Some(sorter) => buff.toList.sortWith(sorter)
223         case None => buff.toList
224       }
225     }
226   }
227
228   def storeAquariumEvent[A <: AquariumEvent](event: A, collection: DBCollection) : Maybe[RecordID] = {
229     storeAny[A](event, collection, JsonNames.id, (e) => e.id, MongoDBStore.jsonSupportToDBObject)
230   }
231
232   def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
233     storeAny[UserState](userState, collection, JsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject)
234   }
235
236   def storeAny[A](any: A,
237                   collection: DBCollection,
238                   idName: String,
239                   idValueProvider: (A) => String,
240                   serializer: (A) => DBObject) : Maybe[RecordID] = Maybe {
241     // Store
242     val dbObj = serializer apply any
243     val writeResult = collection insert dbObj
244     writeResult.getLastError().throwOnError()
245
246     // Get back to retrieve unique id
247     val cursor = collection.find(new BasicDBObject(idName, idValueProvider(any)))
248
249     try {
250       // TODO: better way to get _id?
251       if(cursor.hasNext)
252         RecordID(cursor.next().get(JsonNames._id).toString)
253       else
254         throw new StoreException("Could not store %s to %s".format(any, collection))
255     } finally {
256       cursor.close()
257     }
258  }
259
260   def jsonSupportToDBObject(any: JsonSupport): DBObject = {
261     JSON.parse(any.toJson) match {
262       case dbObject: DBObject ⇒
263         dbObject
264       case _ ⇒
265         throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))
266     }
267   }
268 }