Implement MondoDB-based UserStore with basic queries.
[aquarium] / logic / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import gr.grnet.aquarium.util.Loggable
39 import com.ckkloverdos.maybe.{Failed, Just, Maybe}
40 import gr.grnet.aquarium.logic.events.{ResourceEvent, AquariumEvent}
41 import com.mongodb.util.JSON
42 import gr.grnet.aquarium.store.{UserStore, RecordID, StoreException, EventStore}
43 import gr.grnet.aquarium.user.UserState
44 import gr.grnet.aquarium.user.UserState.JsonNames
45 import gr.grnet.aquarium.util.displayableObjectInfo
46 import gr.grnet.aquarium.util.json.JsonSupport
47 import com.mongodb._
48 import collection.mutable.{ArrayBuffer, ListBuffer}
49
50 /**
51  * Mongodb implementation of the event store (and soon the user store).
52  *
53  * @author Christos KK Loverdos <loverdos@gmail.com>
54  * @author Georgios Gousios <gousiosg@gmail.com>
55  */
56 class MongoDBStore(
57     val mongo: Mongo,
58     val database: String,
59     val username: String,
60     val password: String)
61   extends EventStore with UserStore with Loggable {
62   
63   private[store] lazy val events: DBCollection = getCollection(MongoDBStore.EVENTS_COLLECTION)
64   private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
65
66   private[this] def getCollection(name: String): DBCollection = {
67     val db = mongo.getDB(database)
68     if(!db.authenticate(username, password.toCharArray)) {
69       throw new StoreException("Could not authenticate user %s".format(username))
70     }
71     db.getCollection(name)
72   }
73
74   private[this] def _deserializeEvent[A <: AquariumEvent](a: DBObject): A = {
75     //TODO: Distinguish events and deserialize appropriately
76     ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
77   }
78   
79   private[this] def _deserializeUserState(dbObj: DBObject): UserState = {
80     val jsonString = JSON.serialize(dbObj)
81     UserState.fromJson(jsonString)
82   }
83
84   private[this] def _makeDBObject(any: JsonSupport): DBObject = {
85     JSON.parse(any.toJson) match {
86       case dbObject: DBObject ⇒
87         dbObject
88       case _ ⇒
89         throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))
90     }
91   }
92
93   private[this] def _prepareFieldQuery(name: String, value: String): DBObject = {
94     val dbObj = new BasicDBObject(1)
95     dbObj.put(name, value)
96     dbObj
97   }
98
99   private[this] def _insertObject(collection: DBCollection, obj: JsonSupport): DBObject = {
100     val dbObj = _makeDBObject(obj)
101     collection insert dbObj
102     dbObj
103   }
104
105   private[this] def _checkWasInserted(collection: DBCollection, obj: JsonSupport,  idName: String, id: String): String = {
106     val cursor = collection.find(_prepareFieldQuery(idName, id))
107     if (!cursor.hasNext) {
108       val errMsg = "Failed to store %s".format(displayableObjectInfo(obj))
109       logger.error(errMsg)
110       throw new StoreException(errMsg)
111     }
112
113     val retval = cursor.next.get("_id").toString
114     cursor.close()
115     retval
116   }
117
118   /* TODO: Some of the following methods rely on JSON (de-)serialization).
119    * A method based on proper object serialization would be much faster.
120    */
121
122   //+EventStore
123   def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = {
124     try {
125       // Store
126       val dbObj = _makeDBObject(event)
127       events.insert(dbObj)
128
129       // TODO: Make this retrieval a configurable option
130       // Get back to retrieve unique id
131       val cursor = events.find(_prepareFieldQuery("id", event.id))
132
133
134       Just(RecordID(cursor.next.get("_id").toString))
135     } catch {
136       case m: MongoException =>
137         logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
138     }
139   }
140
141   def findEventById[A <: AquariumEvent](id: String): Option[A] = {
142     val q = new BasicDBObject()
143     q.put("id", id)
144
145     val cur = events.find(q)
146
147     if (cur.hasNext)
148       Some(_deserializeEvent(cur.next))
149     else
150       None
151   }
152
153   def findEventsByUserId[A <: AquariumEvent](userId: Long)
154                                             (sortWith: Option[(A, A) => Boolean]): List[A] = {
155     val q = new BasicDBObject()
156     q.put("userId", userId)
157
158     val cur = events.find(q)
159
160     if (!cur.hasNext)
161       return List()
162
163     val buff = new ListBuffer[A]()
164
165     while(cur.hasNext)
166       buff += _deserializeEvent(cur.next)
167
168     sortWith match {
169       case Some(sorter) => buff.toList.sortWith(sorter)
170       case None => buff.toList
171     }
172   }
173   //-EventStore
174
175   //+UserStore
176   def storeUserState(userState: UserState): Maybe[RecordID] = {
177     Maybe {
178       val dbObj = _insertObject(users, userState)
179       val id    = _checkWasInserted(users, userState, JsonNames.userId, userState.userId)
180       RecordID(id)
181     }
182   }
183
184   def findUserStateByUserId(userId: String): Maybe[UserState] = {
185     Maybe {
186       val queryObj = _prepareFieldQuery(JsonNames.userId, userId)
187       val cursor = events.find(queryObj)
188
189       if(!cursor.hasNext) {
190         cursor.close()
191         null
192       } else {
193         val userState = _deserializeUserState(cursor.next())
194         cursor.close()
195         userState
196       }
197     }
198   }
199   //-UserStore
200 }
201
202 object MongoDBStore {
203   def EVENTS_COLLECTION = "events"
204   def USERS_COLLECTION = "users"
205 }