422f3f36c07d3cd39fa4932c8799e80d4507f357
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import gr.grnet.aquarium.util.Loggable
39 import com.ckkloverdos.maybe.{Failed, Just, Maybe}
40 import com.mongodb.util.JSON
41 import gr.grnet.aquarium.user.UserState
42 import gr.grnet.aquarium.user.UserState.JsonNames
43 import gr.grnet.aquarium.util.displayableObjectInfo
44 import gr.grnet.aquarium.util.json.JsonSupport
45 import collection.mutable.{ListBuffer}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.logic.events.{WalletEntry, UserEvent, ResourceEvent, AquariumEvent}
48 import java.util.Date
49 import com.mongodb._
50
51 /**
52  * Mongodb implementation of the event _store (and soon the user _store).
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  * @author Georgios Gousios <gousiosg@gmail.com>
56  */
57 class MongoDBStore(
58     val mongo: Mongo,
59     val database: String,
60     val username: String,
61     val password: String)
62   extends EventStore with UserStore with WalletStore with Loggable {
63
64   private[store] lazy val events: DBCollection = getCollection(MongoDBStore.EVENTS_COLLECTION)
65   private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
66   private[store] lazy val imevents: DBCollection = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
67   private[store] lazy val wallets: DBCollection = getCollection(MongoDBStore.IM_WALLETS)
68
69   private[this] def getCollection(name: String): DBCollection = {
70     val db = mongo.getDB(database)
71     if(!db.authenticate(username, password.toCharArray)) {
72       throw new StoreException("Could not authenticate user %s".format(username))
73     }
74     db.getCollection(name)
75   }
76
77   /* TODO: Some of the following methods rely on JSON (de-)serialization).
78   * A method based on proper object serialization would be much faster.
79   */
80
81   private[this] def _deserializeEvent[A <: AquariumEvent](a: DBObject): A = {
82     //TODO: Distinguish events and deserialize appropriately
83     ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
84   }
85
86   private[this] def _deserializeUserState(dbObj: DBObject): UserState = {
87     val jsonString = JSON.serialize(dbObj)
88     UserState.fromJson(jsonString)
89   }
90
91   private[this] def _makeDBObject(any: JsonSupport): DBObject = {
92     JSON.parse(any.toJson) match {
93       case dbObject: DBObject ⇒
94         dbObject
95       case _ ⇒
96         throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))
97     }
98   }
99
100   private[this] def _prepareFieldQuery(name: String, value: String): DBObject = {
101     val dbObj = new BasicDBObject(1)
102     dbObj.put(name, value)
103     dbObj
104   }
105
106   private[this] def _insertObject(collection: DBCollection, obj: JsonSupport): DBObject = {
107     val dbObj = _makeDBObject(obj)
108     collection insert dbObj
109     dbObj
110   }
111
112   private[this] def _checkWasInserted(collection: DBCollection, obj: JsonSupport,  idName: String, id: String): String = {
113     val cursor = collection.find(_prepareFieldQuery(idName, id))
114     if (!cursor.hasNext) {
115       val errMsg = "Failed to _store %s".format(displayableObjectInfo(obj))
116       logger.error(errMsg)
117       throw new StoreException(errMsg)
118     }
119
120     val retval = cursor.next.get("_id").toString
121     cursor.close()
122     retval
123   }
124
125   private[this] def _store[A <: AquariumEvent](entry: A, col: DBCollection) : Maybe[RecordID] = {
126     try {
127       // Store
128       val dbObj = _makeDBObject(entry)
129       col.insert(dbObj)
130
131       // Get back to retrieve unique id
132       val cursor = col.find(_prepareFieldQuery("id", entry.id))
133
134       if (!cursor.hasNext) {
135         logger.error("Failed to _store entry: %s".format(entry))
136         return Failed(new StoreException("Failed to _store entry: %s".format(entry)))
137       }
138
139       Just(RecordID(cursor.next.get("_id").toString))
140     } catch {
141       case m: MongoException =>
142         logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
143     }
144   }
145
146   private[this] def _findById[A <: AquariumEvent](id: String, col: DBCollection) : Option[A] = {
147     val q = new BasicDBObject()
148     q.put("id", id)
149
150     val cur = col.find(q)
151
152     if (cur.hasNext)
153       Some(_deserializeEvent(cur.next))
154     else
155       None
156   }
157   
158   private[this] def _query[A <: AquariumEvent](q: BasicDBObject,
159                                               col: DBCollection)
160                                               (sortWith: Option[(A, A) => Boolean]): List[A] = {
161     val cur = col.find(q)
162     if (!cur.hasNext)
163       return List()
164
165     val buff = new ListBuffer[A]()
166
167     while(cur.hasNext)
168       buff += _deserializeEvent(cur.next)
169
170     sortWith match {
171       case Some(sorter) => buff.toList.sortWith(sorter)
172       case None => buff.toList
173     }
174   }
175
176   private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
177     if (one.timestamp > two.timestamp) false
178     else if (one.timestamp < two.timestamp) true
179     else true
180   }
181
182   private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
183     if (one.timestamp < two.timestamp) false
184     else if (one.timestamp > two.timestamp) true
185     else true
186   }
187
188   //+EventStore
189   def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = _store(event, events)
190
191   def findEventById[A <: AquariumEvent](id: String): Option[A] = _findById[A](id, events)
192
193   def findEventsByUserId[A <: AquariumEvent](userId: String)
194                                             (sortWith: Option[(A, A) => Boolean]): List[A] = {
195     val q = new BasicDBObject()
196     q.put("userId", userId)
197
198     _query(q, events)(sortWith)
199   }
200   //-EventStore
201
202   //+UserStore
203   def storeUserState(userState: UserState): Maybe[RecordID] = {
204     Maybe {
205       val dbObj = _insertObject(users, userState)
206       val id    = _checkWasInserted(users, userState, JsonNames.userId, userState.userId)
207       RecordID(id)
208     }
209   }
210
211   def findUserStateByUserId(userId: String): Maybe[UserState] = {
212     Maybe {
213       val queryObj = _prepareFieldQuery(JsonNames.userId, userId)
214       val cursor = events.find(queryObj)
215
216       if(!cursor.hasNext) {
217         cursor.close()
218         null
219       } else {
220         val userState = _deserializeUserState(cursor.next())
221         cursor.close()
222         userState
223       }
224     }
225   }
226   //-UserStore
227
228   //+WalletStore
229   def store(entry: WalletEntry): Maybe[RecordID] = _store(entry, wallets)
230
231   def findEntryById(id: String): Option[WalletEntry] = _findById[WalletEntry](id, wallets)
232
233   def findAllUserEntries(userId: String) = findUserEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
234
235   def findUserEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
236     val q = new BasicDBObject()
237     q.put("timestamp", new BasicDBObject("$gt", from.getTime))
238     q.put("timestamp", new BasicDBObject("$lt", to.getTime))
239     q.put("userId", userId)
240
241     _query[WalletEntry](q, wallets)(Some(_sortByTimestampAsc))
242   }
243   //-WalletStore
244 }
245
246 object MongoDBStore {
247   def EVENTS_COLLECTION = "events"
248   def USERS_COLLECTION = "users"
249   def IM_EVENTS_COLLECTION = "imevents"
250   def IM_WALLETS = "wallets"
251 }