c7c008f38762e97413983d58b73f314751893433
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import gr.grnet.aquarium.util.Loggable
39 import com.mongodb.util.JSON
40 import gr.grnet.aquarium.user.UserState
41 import gr.grnet.aquarium.util.displayableObjectInfo
42 import gr.grnet.aquarium.util.json.JsonSupport
43 import collection.mutable.ListBuffer
44 import gr.grnet.aquarium.store._
45 import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, AquariumEvent}
46 import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames
47 import java.util.Date
48 import com.ckkloverdos.maybe.Maybe
49 import com.mongodb._
50
51 /**
52  * Mongodb implementation of the various aquarium stores.
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  * @author Georgios Gousios <gousiosg@gmail.com>
56  */
57 class MongoDBStore(
58     val mongo: Mongo,
59     val database: String,
60     val username: String,
61     val password: String)
62   extends ResourceEventStore with UserStateStore with WalletEntryStore with Loggable {
63
64   private[store] lazy val rcevents: DBCollection = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
65   private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
66   private[store] lazy val imevents: DBCollection = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
67   private[store] lazy val wallets: DBCollection = getCollection(MongoDBStore.IM_WALLETS)
68
69   private[this] def getCollection(name: String): DBCollection = {
70     val db = mongo.getDB(database)
71     if(!db.authenticate(username, password.toCharArray)) {
72       throw new StoreException("Could not authenticate user %s".format(username))
73     }
74     db.getCollection(name)
75   }
76
77   /* TODO: Some of the following methods rely on JSON (de-)serialization).
78   * A method based on proper object serialization would be much faster.
79   */
80
81   private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
82     if (one.occurredMillis > two.occurredMillis) false
83     else if (one.occurredMillis < two.occurredMillis) true
84     else true
85   }
86
87   private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
88     if (one.occurredMillis < two.occurredMillis) false
89     else if (one.occurredMillis > two.occurredMillis) true
90     else true
91   }
92
93   //+ResourceEventStore
94   def storeResourceEvent(event: ResourceEvent): Maybe[RecordID] =
95     MongoDBStore.storeAquariumEvent(event, rcevents)
96
97   def findResourceEventById(id: String): Maybe[ResourceEvent] =
98     MongoDBStore.findById(id, rcevents, MongoDBStore.dbObjectToResourceEvent)
99
100   def findResourceEventsByUserId(userId: String)
101                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
102     val query = new BasicDBObject(JsonNames.userId, userId)
103
104     MongoDBStore.runQuery(query, rcevents)(MongoDBStore.dbObjectToResourceEvent)(sortWith)
105   }
106
107   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
108     val query = new BasicDBObject()
109     query.put(JsonNames.userId, userId)
110     query.put(JsonNames.timestamp, new BasicDBObject("$gte", timestamp))
111     
112     val sort = new BasicDBObject(JsonNames.timestamp, 1)
113
114     val cursor = rcevents.find(query).sort(sort)
115
116     try {
117       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
118       while(cursor.hasNext) {
119         buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
120       }
121       buffer.toList
122     } finally {
123       cursor.close()
124     }
125   }
126   //-ResourceEventStore
127
128   //+UserStateStore
129   def storeUserState(userState: UserState): Maybe[RecordID] =
130     MongoDBStore.storeUserState(userState, users)
131
132   def findUserStateByUserId(userId: String): Maybe[UserState] = {
133     Maybe {
134       val query = new BasicDBObject(JsonNames.userId, userId)
135       val cursor = rcevents find query
136
137       try {
138         if(cursor.hasNext)
139           MongoDBStore.dbObjectToUserState(cursor.next())
140         else
141           null
142       } finally {
143         cursor.close()
144       }
145     }
146   }
147   //-UserStateStore
148
149   //+WalletEntryStore
150   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] =
151     MongoDBStore.storeAquariumEvent(entry, wallets)
152
153   def findWalletEntryById(id: String): Maybe[WalletEntry] =
154     MongoDBStore.findById[WalletEntry](id, wallets, MongoDBStore.dbObjectToWalletEntry)
155
156   def findUserWalletEntries(userId: String) = {
157     // TODO: optimize
158     findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
159   }
160
161   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
162     val q = new BasicDBObject()
163     // TODO: Is this the correct way for an AND query?
164     q.put(JsonNames.timestamp, new BasicDBObject("$gt", from.getTime))
165     q.put(JsonNames.timestamp, new BasicDBObject("$lt", to.getTime))
166     q.put(JsonNames.userId, userId)
167
168     MongoDBStore.runQuery[WalletEntry](q, wallets)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
169   }
170
171
172   //-WalletEntryStore
173   def findLatestUserWalletEntries(userId: String) = {
174     Maybe {
175       val orderBy = new BasicDBObject(JsonNames.occurredMillis, -1) // -1 is descending order
176       val cursor = wallets.find().sort(orderBy)
177
178       try {
179         val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
180         if(cursor.hasNext) {
181           val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
182           buffer += walletEntry
183
184           var _previousOccurredMillis = walletEntry.occurredMillis
185           var _ok = true
186
187           while(cursor.hasNext && _ok) {
188             val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
189             var currentOccurredMillis = walletEntry.occurredMillis
190             _ok = currentOccurredMillis == _previousOccurredMillis
191             
192             if(_ok) {
193               buffer += walletEntry
194             }
195           }
196
197           buffer.toList
198         } else {
199           null
200         }
201       } finally {
202         cursor.close()
203       }
204     }
205   }
206 }
207
208 object MongoDBStore {
209   final val RESOURCE_EVENTS_COLLECTION = "resevents"
210   final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents"
211   final val USERS_COLLECTION = "users"
212   final val IM_EVENTS_COLLECTION = "imevents"
213   final val IM_WALLETS = "wallets"
214
215   def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
216     ResourceEvent.fromJson(JSON.serialize(dbObject))
217   }
218
219   def dbObjectToUserState(dbObj: DBObject): UserState = {
220     UserState.fromJson(JSON.serialize(dbObj))
221   }
222
223   def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
224     WalletEntry.fromJson(JSON.serialize(dbObj))
225   }
226
227   def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe {
228     val query = new BasicDBObject(JsonNames.id, id)
229     val cursor = collection find query
230
231     try {
232       if(cursor.hasNext)
233         deserializer apply cursor.next
234       else
235         null: A // will be transformed to NoVal by the Maybe polymorphic constructor
236     } finally {
237       cursor.close()
238     }
239   }
240
241   def runQuery[A <: AquariumEvent](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
242                                   (deserializer: (DBObject) => A)
243                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
244     val cursor0 = collection find query
245     val cursor = if(orderBy ne null) {
246       cursor0 sort orderBy
247     } else {
248       cursor0
249     } // I really know that docs say that it is the same cursor.
250
251     if(!cursor.hasNext) {
252       cursor.close()
253       Nil
254     } else {
255       val buff = new ListBuffer[A]()
256
257       while(cursor.hasNext) {
258         buff += deserializer apply cursor.next
259       }
260
261       cursor.close()
262
263       sortWith match {
264         case Some(sorter) => buff.toList.sortWith(sorter)
265         case None => buff.toList
266       }
267     }
268   }
269
270   def storeAquariumEvent[A <: AquariumEvent](event: A, collection: DBCollection) : Maybe[RecordID] = {
271     storeAny[A](event, collection, JsonNames.id, (e) => e.id, MongoDBStore.jsonSupportToDBObject)
272   }
273
274   def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
275     storeAny[UserState](userState, collection, JsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject)
276   }
277
278   def storeAny[A](any: A,
279                   collection: DBCollection,
280                   idName: String,
281                   idValueProvider: (A) => String,
282                   serializer: (A) => DBObject) : Maybe[RecordID] = Maybe {
283     // Store
284     val dbObj = serializer apply any
285     val writeResult = collection insert dbObj
286     writeResult.getLastError().throwOnError()
287
288     // Get back to retrieve unique id
289     val cursor = collection.find(new BasicDBObject(idName, idValueProvider(any)))
290
291     try {
292       // TODO: better way to get _id?
293       if(cursor.hasNext)
294         RecordID(cursor.next().get(JsonNames._id).toString)
295       else
296         throw new StoreException("Could not store %s to %s".format(any, collection))
297     } finally {
298       cursor.close()
299     }
300  }
301
302   def jsonSupportToDBObject(any: JsonSupport): DBObject = {
303     JSON.parse(any.toJson) match {
304       case dbObject: DBObject ⇒
305         dbObject
306       case _ ⇒
307         throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))
308     }
309   }
310 }