Flat project hierarchy
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import gr.grnet.aquarium.util.Loggable
39 import com.ckkloverdos.maybe.{Failed, Just, Maybe}
40 import com.mongodb.util.JSON
41 import gr.grnet.aquarium.user.UserState
42 import gr.grnet.aquarium.user.UserState.JsonNames
43 import gr.grnet.aquarium.util.displayableObjectInfo
44 import gr.grnet.aquarium.util.json.JsonSupport
45 import collection.mutable.{ListBuffer}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.logic.events.{WalletEntry, UserEvent, ResourceEvent, AquariumEvent}
48 import java.util.Date
49 import com.mongodb._
50
51 /**
52  * Mongodb implementation of the event _store (and soon the user _store).
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  * @author Georgios Gousios <gousiosg@gmail.com>
56  */
57 class MongoDBStore(
58     val mongo: Mongo,
59     val database: String,
60     val username: String,
61     val password: String)
62   extends EventStore with UserStore
63   with IMStore with WalletStore with Loggable {
64
65   private[store] lazy val events: DBCollection = getCollection(MongoDBStore.EVENTS_COLLECTION)
66   private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
67   private[store] lazy val imevents: DBCollection = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
68   private[store] lazy val wallets: DBCollection = getCollection(MongoDBStore.IM_WALLETS)
69
70   private[this] def getCollection(name: String): DBCollection = {
71     val db = mongo.getDB(database)
72     if(!db.authenticate(username, password.toCharArray)) {
73       throw new StoreException("Could not authenticate user %s".format(username))
74     }
75     db.getCollection(name)
76   }
77
78   /* TODO: Some of the following methods rely on JSON (de-)serialization).
79   * A method based on proper object serialization would be much faster.
80   */
81
82   private[this] def _deserializeEvent[A <: AquariumEvent](a: DBObject): A = {
83     //TODO: Distinguish events and deserialize appropriately
84     ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
85   }
86
87   private[this] def _deserializeUserState(dbObj: DBObject): UserState = {
88     val jsonString = JSON.serialize(dbObj)
89     UserState.fromJson(jsonString)
90   }
91
92   private[this] def _makeDBObject(any: JsonSupport): DBObject = {
93     JSON.parse(any.toJson) match {
94       case dbObject: DBObject ⇒
95         dbObject
96       case _ ⇒
97         throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))
98     }
99   }
100
101   private[this] def _prepareFieldQuery(name: String, value: String): DBObject = {
102     val dbObj = new BasicDBObject(1)
103     dbObj.put(name, value)
104     dbObj
105   }
106
107   private[this] def _insertObject(collection: DBCollection, obj: JsonSupport): DBObject = {
108     val dbObj = _makeDBObject(obj)
109     collection insert dbObj
110     dbObj
111   }
112
113   private[this] def _checkWasInserted(collection: DBCollection, obj: JsonSupport,  idName: String, id: String): String = {
114     val cursor = collection.find(_prepareFieldQuery(idName, id))
115     if (!cursor.hasNext) {
116       val errMsg = "Failed to _store %s".format(displayableObjectInfo(obj))
117       logger.error(errMsg)
118       throw new StoreException(errMsg)
119     }
120
121     val retval = cursor.next.get("_id").toString
122     cursor.close()
123     retval
124   }
125
126   private[this] def _store[A <: AquariumEvent](entry: A, col: DBCollection) : Maybe[RecordID] = {
127     try {
128       // Store
129       val dbObj = _makeDBObject(entry)
130       col.insert(dbObj)
131
132       // Get back to retrieve unique id
133       val cursor = col.find(_prepareFieldQuery("id", entry.id))
134
135       if (!cursor.hasNext) {
136         logger.error("Failed to _store entry: %s".format(entry))
137         return Failed(new StoreException("Failed to _store entry: %s".format(entry)))
138       }
139
140       Just(RecordID(cursor.next.get("_id").toString))
141     } catch {
142       case m: MongoException =>
143         logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
144     }
145   }
146
147   private[this] def _findById[A <: AquariumEvent](id: String, col: DBCollection) : Option[A] = {
148     val q = new BasicDBObject()
149     q.put("id", id)
150
151     val cur = col.find(q)
152
153     if (cur.hasNext)
154       Some(_deserializeEvent(cur.next))
155     else
156       None
157   }
158   
159   private[this] def _query[A <: AquariumEvent](q: BasicDBObject,
160                                               col: DBCollection)
161                                               (sortWith: Option[(A, A) => Boolean]): List[A] = {
162     val cur = col.find(q)
163     if (!cur.hasNext)
164       return List()
165
166     val buff = new ListBuffer[A]()
167
168     while(cur.hasNext)
169       buff += _deserializeEvent(cur.next)
170
171     sortWith match {
172       case Some(sorter) => buff.toList.sortWith(sorter)
173       case None => buff.toList
174     }
175   }
176
177   private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
178     if (one.timestamp > two.timestamp) false
179     else if (one.timestamp < two.timestamp) true
180     else true
181   }
182
183   private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
184     if (one.timestamp < two.timestamp) false
185     else if (one.timestamp > two.timestamp) true
186     else true
187   }
188
189   //+EventStore
190   def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = _store(event, events)
191
192   def findEventById[A <: AquariumEvent](id: String): Option[A] = _findById[A](id, events)
193
194   def findEventsByUserId[A <: AquariumEvent](userId: Long)
195                                             (sortWith: Option[(A, A) => Boolean]): List[A] = {
196     val q = new BasicDBObject()
197     q.put("userId", userId)
198
199     _query(q, events)(sortWith)
200   }
201   //-EventStore
202
203   //+UserStore
204   def storeUserState(userState: UserState): Maybe[RecordID] = {
205     Maybe {
206       val dbObj = _insertObject(users, userState)
207       val id    = _checkWasInserted(users, userState, JsonNames.userId, userState.userId)
208       RecordID(id)
209     }
210   }
211
212   def findUserStateByUserId(userId: String): Maybe[UserState] = {
213     Maybe {
214       val queryObj = _prepareFieldQuery(JsonNames.userId, userId)
215       val cursor = events.find(queryObj)
216
217       if(!cursor.hasNext) {
218         cursor.close()
219         null
220       } else {
221         val userState = _deserializeUserState(cursor.next())
222         cursor.close()
223         userState
224       }
225     }
226   }
227   //-UserStore
228
229   //+IMStore
230   def storeUserEvent(event: UserEvent): Maybe[RecordID] = _store(event, imevents)
231
232   def findUserEventById[A <: UserEvent](id: String): Option[A] = _findById(id, imevents)
233
234   def findUserEventsByUserId[A <: UserEvent](userId: String)(sortWith: Option[(A, A) => Boolean]): List[A] = {
235     val q = new BasicDBObject()
236     q.put("userId", userId)
237
238     _query(q, imevents)(sortWith)
239   }
240
241   def findLastUserEvent(userId: String) =
242     findUserEventsByUserId(userId)(Some(_sortByTimestampDesc)).headOption
243
244   def userExists(userId: String) = false //TODO: Write me
245   //-IMStore
246
247   //+WalletStore
248   def store(entry: WalletEntry): Maybe[RecordID] = _store(entry, wallets)
249
250   def findEntryById(id: String): Option[WalletEntry] = _findById[WalletEntry](id, wallets)
251
252   def findAllUserEntries(userId: String) = findUserEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
253
254   def findUserEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
255     val q = new BasicDBObject()
256     q.put("timestamp", new BasicDBObject("$gt", from.getTime))
257     q.put("timestamp", new BasicDBObject("$lt", to.getTime))
258     q.put("userId", userId)
259
260     _query[WalletEntry](q, wallets)(Some(_sortByTimestampAsc))
261   }
262   //-WalletStore
263 }
264
265 object MongoDBStore {
266   def EVENTS_COLLECTION = "events"
267   def USERS_COLLECTION = "users"
268   def IM_EVENTS_COLLECTION = "imevents"
269   def IM_WALLETS = "wallets"
270 }