Consistent `Id` naming
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import gr.grnet.aquarium.util.Loggable
39 import com.mongodb.util.JSON
40 import gr.grnet.aquarium.user.UserState
41 import gr.grnet.aquarium.util.displayableObjectInfo
42 import gr.grnet.aquarium.util.json.JsonSupport
43 import collection.mutable.ListBuffer
44 import gr.grnet.aquarium.store._
45 import gr.grnet.aquarium.logic.events.ResourceEvent.{JsonNames => ResourceJsonNames}
46 import gr.grnet.aquarium.logic.events.WalletEntry.{JsonNames => WalletJsonNames}
47 import java.util.Date
48 import com.ckkloverdos.maybe.Maybe
49 import com.mongodb._
50 import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent, AquariumEvent}
51
52 /**
53  * Mongodb implementation of the various aquarium stores.
54  *
55  * @author Christos KK Loverdos <loverdos@gmail.com>
56  * @author Georgios Gousios <gousiosg@gmail.com>
57  */
58 class MongoDBStore(
59     val mongo: Mongo,
60     val database: String,
61     val username: String,
62     val password: String)
63   extends ResourceEventStore with UserStateStore
64   with WalletEntryStore with UserEventStore
65   with Loggable {
66
67   private[store] lazy val rcEvents      = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
68   private[store] lazy val userStates    = getCollection(MongoDBStore.USER_STATES_COLLECTION)
69   private[store] lazy val userEvents    = getCollection(MongoDBStore.USER_EVENTS_COLLECTION)
70   private[store] lazy val walletEntries = getCollection(MongoDBStore.WALLET_ENTRIES_COLLECTION)
71
72   private[this] def getCollection(name: String): DBCollection = {
73     val db = mongo.getDB(database)
74     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
75       throw new StoreException("Could not authenticate user %s".format(username))
76     }
77     db.getCollection(name)
78   }
79
80   private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
81     if (one.occurredMillis > two.occurredMillis) false
82     else if (one.occurredMillis < two.occurredMillis) true
83     else true
84   }
85
86   private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
87     if (one.occurredMillis < two.occurredMillis) false
88     else if (one.occurredMillis > two.occurredMillis) true
89     else true
90   }
91
92   //+ResourceEventStore
93   def storeResourceEvent(event: ResourceEvent): Maybe[RecordID] =
94     MongoDBStore.storeAquariumEvent(event, rcEvents)
95
96   def findResourceEventById(id: String): Maybe[ResourceEvent] =
97     MongoDBStore.findById(id, rcEvents, MongoDBStore.dbObjectToResourceEvent)
98
99   def findResourceEventsByUserId(userId: String)
100                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
101     val query = new BasicDBObject(ResourceJsonNames.userId, userId)
102
103     MongoDBStore.runQuery(query, rcEvents)(MongoDBStore.dbObjectToResourceEvent)(sortWith)
104   }
105
106   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
107     val query = new BasicDBObject()
108     query.put(ResourceJsonNames.userId, userId)
109     query.put(ResourceJsonNames.timestamp, new BasicDBObject("$gte", timestamp))
110     
111     val sort = new BasicDBObject(ResourceJsonNames.timestamp, 1)
112
113     val cursor = rcEvents.find(query).sort(sort)
114
115     try {
116       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
117       while(cursor.hasNext) {
118         buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
119       }
120       buffer.toList
121     } finally {
122       cursor.close()
123     }
124   }
125   //-ResourceEventStore
126
127   //+UserStateStore
128   def storeUserState(userState: UserState): Maybe[RecordID] =
129     MongoDBStore.storeUserState(userState, userStates)
130
131   def findUserStateByUserId(userId: String): Maybe[UserState] = {
132     Maybe {
133       val query = new BasicDBObject(ResourceJsonNames.userId, userId)
134       val cursor = userStates find query
135
136       try {
137         if(cursor.hasNext)
138           MongoDBStore.dbObjectToUserState(cursor.next())
139         else
140           null
141       } finally {
142         cursor.close()
143       }
144     }
145   }
146   //-UserStateStore
147
148   //+WalletEntryStore
149   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] =
150     MongoDBStore.storeAquariumEvent(entry, walletEntries)
151
152   def findWalletEntryById(id: String): Maybe[WalletEntry] =
153     MongoDBStore.findById[WalletEntry](id, walletEntries, MongoDBStore.dbObjectToWalletEntry)
154
155   def findUserWalletEntries(userId: String) = {
156     // TODO: optimize
157     findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
158   }
159
160   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
161     val q = new BasicDBObject()
162     // TODO: Is this the correct way for an AND query?
163     q.put(ResourceJsonNames.timestamp, new BasicDBObject("$gt", from.getTime))
164     q.put(ResourceJsonNames.timestamp, new BasicDBObject("$lt", to.getTime))
165     q.put(ResourceJsonNames.userId, userId)
166
167     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
168   }
169
170   def findLatestUserWalletEntries(userId: String) = {
171     Maybe {
172       val orderBy = new BasicDBObject(ResourceJsonNames.occurredMillis, -1) // -1 is descending order
173       val cursor = walletEntries.find().sort(orderBy)
174
175       try {
176         val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
177         if(cursor.hasNext) {
178           val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
179           buffer += walletEntry
180
181           var _previousOccurredMillis = walletEntry.occurredMillis
182           var _ok = true
183
184           while(cursor.hasNext && _ok) {
185             val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
186             var currentOccurredMillis = walletEntry.occurredMillis
187             _ok = currentOccurredMillis == _previousOccurredMillis
188             
189             if(_ok) {
190               buffer += walletEntry
191             }
192           }
193
194           buffer.toList
195         } else {
196           null
197         }
198       } finally {
199         cursor.close()
200       }
201     }
202   }
203
204   def findPreviousEntry(userId: String, resource: String,
205                         instanceid: String,
206                         finalized: Option[Boolean]): List[WalletEntry] = {
207     val q = new BasicDBObject()
208     q.put(WalletJsonNames.userId, userId)
209     q.put(WalletJsonNames.resource, resource)
210     q.put(WalletJsonNames.instanceId, instanceid)
211     finalized match {
212       case Some(x) => q.put(WalletJsonNames.finalized, x)
213       case None =>
214     }
215
216     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
217   }
218   //-WalletEntryStore
219
220   //+UserEventStore
221   def storeUserEvent(event: UserEvent): Maybe[RecordID] =
222     MongoDBStore.storeAny[UserEvent](event, userEvents, ResourceJsonNames.userId,
223       _.userId, MongoDBStore.jsonSupportToDBObject)
224
225
226   def findUserEventById(id: String): Maybe[UserEvent] =
227     MongoDBStore.findById[UserEvent](id, userEvents, MongoDBStore.dbObjectToUserEvent)
228
229   def findUserEventsByUserId(userId: String)
230                             (sortWith: Option[(UserEvent, UserEvent) => Boolean]): List[UserEvent] = {
231     val query = new BasicDBObject(ResourceJsonNames.userId, userId)
232     MongoDBStore.runQuery(query, userEvents)(MongoDBStore.dbObjectToUserEvent)(sortWith)
233   }
234   //-UserEventStore
235 }
236
237 object MongoDBStore {
238   /**
239    * Collection holding the [[gr.grnet.aquarium.logic.events.ResourceEvent]]s.
240    *
241    * Resource events are coming from all systems handling billable resources.
242    */
243   final val RESOURCE_EVENTS_COLLECTION = "resevents"
244
245   /**
246    * Collection holding the snapshots of [[gr.grnet.aquarium.user.UserState]].
247    *
248    * [[gr.grnet.aquarium.user.UserState]] is held internally within [[gr.grnet.aquarium.user.actor.UserActor]]s.
249    */
250   final val USER_STATES_COLLECTION = "userstates"
251
252   /**
253    * Collection holding [[gr.grnet.aquarium.logic.events.UserEvent]]s.
254    *
255    * User events are coming from the IM module (external).
256    */
257   final val USER_EVENTS_COLLECTION = "userevents"
258
259
260   /**
261    * Collection holding [[gr.grnet.aquarium.logic.events.WalletEntry]].
262    *
263    * Wallet entries are generated internally in Aquarium.
264    */
265   final val WALLET_ENTRIES_COLLECTION = "wallets"
266
267   /* TODO: Some of the following methods rely on JSON (de-)serialization).
268   * A method based on proper object serialization would be much faster.
269   */
270   def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
271     ResourceEvent.fromJson(JSON.serialize(dbObject))
272   }
273
274   def dbObjectToUserState(dbObj: DBObject): UserState = {
275     UserState.fromJson(JSON.serialize(dbObj))
276   }
277
278   def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
279     WalletEntry.fromJson(JSON.serialize(dbObj))
280   }
281
282   def dbObjectToUserEvent(dbObj: DBObject): UserEvent = {
283     UserEvent.fromJson(JSON.serialize(dbObj))
284   }
285
286   def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe {
287     val query = new BasicDBObject(ResourceJsonNames.id, id)
288     val cursor = collection find query
289
290     try {
291       if(cursor.hasNext)
292         deserializer apply cursor.next
293       else
294         null: A // will be transformed to NoVal by the Maybe polymorphic constructor
295     } finally {
296       cursor.close()
297     }
298   }
299
300   def runQuery[A <: AquariumEvent](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
301                                   (deserializer: (DBObject) => A)
302                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
303     val cursor0 = collection find query
304     val cursor = if(orderBy ne null) {
305       cursor0 sort orderBy
306     } else {
307       cursor0
308     } // I really know that docs say that it is the same cursor.
309
310     if(!cursor.hasNext) {
311       cursor.close()
312       Nil
313     } else {
314       val buff = new ListBuffer[A]()
315
316       while(cursor.hasNext) {
317         buff += deserializer apply cursor.next
318       }
319
320       cursor.close()
321
322       sortWith match {
323         case Some(sorter) => buff.toList.sortWith(sorter)
324         case None => buff.toList
325       }
326     }
327   }
328
329   def storeAquariumEvent[A <: AquariumEvent](event: A, collection: DBCollection) : Maybe[RecordID] = {
330     storeAny[A](event, collection, ResourceJsonNames.id, (e) => e.id, MongoDBStore.jsonSupportToDBObject)
331   }
332
333   def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
334     storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject)
335   }
336
337   def storeAny[A](any: A,
338                   collection: DBCollection,
339                   idName: String,
340                   idValueProvider: (A) => String,
341                   serializer: (A) => DBObject) : Maybe[RecordID] = Maybe {
342     // Store
343     val dbObj = serializer apply any
344     val writeResult = collection insert dbObj
345     writeResult.getLastError().throwOnError()
346
347     // Get back to retrieve unique id
348     val cursor = collection.find(new BasicDBObject(idName, idValueProvider(any)))
349
350     try {
351       // TODO: better way to get _id?
352       if(cursor.hasNext)
353         RecordID(cursor.next().get(ResourceJsonNames._id).toString)
354       else
355         throw new StoreException("Could not store %s to %s".format(any, collection))
356     } finally {
357       cursor.close()
358     }
359  }
360
361   def jsonSupportToDBObject(any: JsonSupport): DBObject = {
362     JSON.parse(any.toJson) match {
363       case dbObject: DBObject ⇒
364         dbObject
365       case _ ⇒
366         throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))
367     }
368   }
369 }