Capture timestamp semantics with a better name.
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import gr.grnet.aquarium.util.Loggable
39 import com.ckkloverdos.maybe.{Failed, Just, Maybe}
40 import com.mongodb.util.JSON
41 import gr.grnet.aquarium.user.UserState
42 import gr.grnet.aquarium.util.displayableObjectInfo
43 import gr.grnet.aquarium.util.json.JsonSupport
44 import collection.mutable.{ListBuffer}
45 import gr.grnet.aquarium.store._
46 import gr.grnet.aquarium.logic.events.{WalletEntry, UserEvent, ResourceEvent, AquariumEvent}
47 import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames
48 import java.util.Date
49 import com.mongodb._
50
51 /**
52  * Mongodb implementation of the event _store (and soon the user _store).
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  * @author Georgios Gousios <gousiosg@gmail.com>
56  */
57 class MongoDBStore(
58     val mongo: Mongo,
59     val database: String,
60     val username: String,
61     val password: String)
62   extends EventStore with UserStore with WalletStore with Loggable {
63
64   private[store] lazy val events: DBCollection = getCollection(MongoDBStore.EVENTS_COLLECTION)
65   private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
66   private[store] lazy val imevents: DBCollection = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
67   private[store] lazy val wallets: DBCollection = getCollection(MongoDBStore.IM_WALLETS)
68
69   private[this] def getCollection(name: String): DBCollection = {
70     val db = mongo.getDB(database)
71     if(!db.authenticate(username, password.toCharArray)) {
72       throw new StoreException("Could not authenticate user %s".format(username))
73     }
74     db.getCollection(name)
75   }
76
77   /* TODO: Some of the following methods rely on JSON (de-)serialization).
78   * A method based on proper object serialization would be much faster.
79   */
80
81   private[this] def _deserializeEvent[A <: AquariumEvent](a: DBObject): A = {
82     //TODO: Distinguish events and deserialize appropriately
83     ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
84   }
85
86   private[this] def _deserializeUserState(dbObj: DBObject): UserState = {
87     val jsonString = JSON.serialize(dbObj)
88     UserState.fromJson(jsonString)
89   }
90
91   private[this] def _makeDBObject(any: JsonSupport): DBObject = {
92     JSON.parse(any.toJson) match {
93       case dbObject: DBObject ⇒
94         dbObject
95       case _ ⇒
96         throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))
97     }
98   }
99
100   private[this] def _prepareFieldQuery(name: String, value: String): DBObject = {
101     val dbObj = new BasicDBObject(1)
102     dbObj.put(name, value)
103     dbObj
104   }
105
106   private[this] def _insertObject(collection: DBCollection, obj: JsonSupport): DBObject = {
107     val dbObj = _makeDBObject(obj)
108     collection insert dbObj
109     dbObj
110   }
111
112   private[this] def _checkWasInserted(collection: DBCollection, obj: JsonSupport,  idName: String, id: String): String = {
113     val cursor = collection.find(_prepareFieldQuery(idName, id))
114     if (!cursor.hasNext) {
115       val errMsg = "Failed to _store %s".format(displayableObjectInfo(obj))
116       logger.error(errMsg)
117       throw new StoreException(errMsg)
118     }
119
120     val retval = cursor.next.get("_id").toString
121     cursor.close()
122     retval
123   }
124
125   private[this] def _store[A <: AquariumEvent](entry: A, col: DBCollection) : Maybe[RecordID] = {
126     try {
127       // Store
128       val dbObj = _makeDBObject(entry)
129       col.insert(dbObj)
130
131       // Get back to retrieve unique id
132       val cursor = col.find(_prepareFieldQuery(JsonNames.id, entry.id))
133
134       if (!cursor.hasNext) {
135         cursor.close()
136         logger.error("Failed to _store entry: %s".format(entry))
137         return Failed(new StoreException("Failed to _store entry: %s".format(entry)))
138       }
139
140       val retval = Just(RecordID(cursor.next.get(JsonNames._id).toString))
141       cursor.close()
142       retval
143     } catch {
144       case m: MongoException =>
145         logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
146     }
147   }
148
149   private[this] def _findById[A <: AquariumEvent](id: String, col: DBCollection) : Option[A] = {
150     val q = new BasicDBObject()
151     q.put(JsonNames.id, id)
152
153     val cur = col.find(q)
154
155     val retval = if (cur.hasNext)
156       Some(_deserializeEvent(cur.next))
157     else
158       None
159     
160     cur.close()
161     retval
162   }
163   
164   private[this] def _query[A <: AquariumEvent](q: BasicDBObject,
165                                               col: DBCollection)
166                                               (sortWith: Option[(A, A) => Boolean]): List[A] = {
167     val cur = col.find(q)
168     if (!cur.hasNext) {
169       cur.close()
170       return List()
171     }
172
173     val buff = new ListBuffer[A]()
174
175     while(cur.hasNext)
176       buff += _deserializeEvent(cur.next)
177
178     cur.close()
179     
180     sortWith match {
181       case Some(sorter) => buff.toList.sortWith(sorter)
182       case None => buff.toList
183     }
184   }
185
186   private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
187     if (one.occurredMillis > two.occurredMillis) false
188     else if (one.occurredMillis < two.occurredMillis) true
189     else true
190   }
191
192   private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
193     if (one.occurredMillis < two.occurredMillis) false
194     else if (one.occurredMillis > two.occurredMillis) true
195     else true
196   }
197
198   //+EventStore
199   def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = _store(event, events)
200
201   def findEventById[A <: AquariumEvent](id: String): Option[A] = _findById[A](id, events)
202
203   def findEventsByUserId[A <: AquariumEvent](userId: String)
204                                             (sortWith: Option[(A, A) => Boolean]): List[A] = {
205     val q = new BasicDBObject()
206     q.put(JsonNames.userId, userId)
207
208     _query(q, events)(sortWith)
209   }
210
211   def findEventsByUserIdAfterTimestamp[A <: AquariumEvent](userId: String, timestamp: Long): List[A] = {
212     val query = new BasicDBObject()
213     query.put(JsonNames.userId, userId)
214     query.put(JsonNames.timestamp, new BasicDBObject("$gte", timestamp))
215     
216     val sort = new BasicDBObject(JsonNames.timestamp, 1)
217
218     val cursor = events.find(query).sort(sort)
219     val buffer = new scala.collection.mutable.ListBuffer[A]
220     while(cursor.hasNext) {
221       buffer += _deserializeEvent(cursor.next())
222     }
223
224     cursor.close()
225
226     buffer.toList
227   }
228   //-EventStore
229
230   //+UserStore
231
232   def storeUserState(userState: UserState): Maybe[RecordID] = {
233     Maybe {
234       val dbObj = _insertObject(users, userState)
235       val id    = _checkWasInserted(users, userState, JsonNames.userId, userState.userId)
236       RecordID(id)
237     }
238   }
239
240   def findUserStateByUserId(userId: String): Maybe[UserState] = {
241     Maybe {
242       val queryObj = _prepareFieldQuery(JsonNames.userId, userId)
243       val cursor = events.find(queryObj)
244
245       if(!cursor.hasNext) {
246         cursor.close()
247         null
248       } else {
249         val userState = _deserializeUserState(cursor.next())
250         cursor.close()
251         userState
252       }
253     }
254   }
255   //-UserStore
256
257   //+WalletStore
258   def store(entry: WalletEntry): Maybe[RecordID] = _store(entry, wallets)
259
260   def findEntryById(id: String): Option[WalletEntry] = _findById[WalletEntry](id, wallets)
261
262   def findAllUserEntries(userId: String) = findUserEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
263
264   def findUserEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
265     val q = new BasicDBObject()
266     q.put(JsonNames.timestamp, new BasicDBObject("$gt", from.getTime))
267     q.put(JsonNames.timestamp, new BasicDBObject("$lt", to.getTime))
268     q.put(JsonNames.userId, userId)
269
270     _query[WalletEntry](q, wallets)(Some(_sortByTimestampAsc))
271   }
272   //-WalletStore
273 }
274
275 object MongoDBStore {
276   def EVENTS_COLLECTION = "events"
277   def USERS_COLLECTION = "users"
278   def IM_EVENTS_COLLECTION = "imevents"
279   def IM_WALLETS = "wallets"
280 }