870fd134f60d29fdaf5e03570fb2cfb715b8ceee
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import com.mongodb.util.JSON
39 import gr.grnet.aquarium.computation.UserState.{JsonNames ⇒ UserStateJsonNames}
40 import gr.grnet.aquarium.util.json.JsonSupport
41 import collection.mutable.ListBuffer
42 import gr.grnet.aquarium.event._
43 import gr.grnet.aquarium.event.im.IMEventModel
44 import gr.grnet.aquarium.event.im.IMEventModel.{Names ⇒ IMEventNames}
45 import gr.grnet.aquarium.event.resource.ResourceEventModel
46 import gr.grnet.aquarium.event.resource.ResourceEventModel.{Names ⇒ ResourceEventNames}
47 import gr.grnet.aquarium.store._
48 import gr.grnet.aquarium.event.WalletEntry.{JsonNames ⇒ WalletJsonNames}
49 import gr.grnet.aquarium.event.PolicyEntry.{JsonNames ⇒ PolicyJsonNames}
50 import java.util.Date
51 import gr.grnet.aquarium.logic.accounting.Policy
52 import com.mongodb._
53 import org.bson.types.ObjectId
54 import com.ckkloverdos.maybe.Maybe
55 import gr.grnet.aquarium.util._
56 import gr.grnet.aquarium.converter.Conversions
57 import gr.grnet.aquarium.computation.UserState
58
59 /**
60  * Mongodb implementation of the various aquarium stores.
61  *
62  * @author Christos KK Loverdos <loverdos@gmail.com>
63  * @author Georgios Gousios <gousiosg@gmail.com>
64  */
65 class MongoDBStore(
66     val mongo: Mongo,
67     val database: String,
68     val username: String,
69     val password: String)
70   extends ResourceEventStore
71   with UserStateStore
72   with WalletEntryStore
73   with IMEventStore
74   with PolicyStore
75   with Loggable {
76
77   override type IMEvent = MongoDBIMEvent
78   override type ResourceEvent = MongoDBResourceEvent
79
80   private[store] lazy val resourceEvents   = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
81   private[store] lazy val userStates       = getCollection(MongoDBStore.USER_STATES_COLLECTION)
82   private[store] lazy val imEvents         = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
83   private[store] lazy val unparsedIMEvents = getCollection(MongoDBStore.UNPARSED_IM_EVENTS_COLLECTION)
84   private[store] lazy val walletEntries    = getCollection(MongoDBStore.WALLET_ENTRIES_COLLECTION)
85   private[store] lazy val policyEntries    = getCollection(MongoDBStore.POLICY_ENTRIES_COLLECTION)
86
87   private[this] def getCollection(name: String): DBCollection = {
88     val db = mongo.getDB(database)
89     //logger.debug("Authenticating to mongo")
90     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
91       throw new StoreException("Could not authenticate user %s".format(username))
92     }
93     db.getCollection(name)
94   }
95
96   private[this] def _sortByTimestampAsc[A <: ExternalEventModel](one: A, two: A): Boolean = {
97     if (one.occurredMillis > two.occurredMillis) false
98     else if (one.occurredMillis < two.occurredMillis) true
99     else true
100   }
101
102   //+ResourceEventStore
103   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
104     MongoDBResourceEvent.fromOther(event, null)
105   }
106
107   def insertResourceEvent(event: ResourceEventModel) = {
108     val localEvent = MongoDBResourceEvent.fromOther(event, new ObjectId().toStringMongod)
109     MongoDBStore.insertObject(localEvent, resourceEvents, MongoDBStore.jsonSupportToDBObject)
110     localEvent
111   }
112
113   def findResourceEventById(id: String): Option[ResourceEvent] = {
114     MongoDBStore.findBy(ResourceEventNames.id, id, resourceEvents, MongoDBResourceEvent.fromDBObject)
115   }
116
117   def findResourceEventsByUserId(userId: String)
118                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
119     val query = new BasicDBObject(ResourceEventNames.userID, userId)
120
121     MongoDBStore.runQuery(query, resourceEvents)(MongoDBResourceEvent.fromDBObject)(sortWith)
122   }
123
124   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
125     val query = new BasicDBObject()
126     query.put(ResourceEventNames.userID, userId)
127     query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$gt", timestamp))
128     
129     val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
130
131     val cursor = resourceEvents.find(query).sort(sort)
132
133     try {
134       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
135       while(cursor.hasNext) {
136         buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
137       }
138       buffer.toList.sortWith(_sortByTimestampAsc)
139     } finally {
140       cursor.close()
141     }
142   }
143
144   def findResourceEventHistory(userId: String, resName: String,
145                                instid: Option[String], upTo: Long) : List[ResourceEvent] = {
146     val query = new BasicDBObject()
147     query.put(ResourceEventNames.userID, userId)
148     query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$lt", upTo))
149     query.put(ResourceEventNames.resource, resName)
150
151     instid match {
152       case Some(id) =>
153         Policy.policy.findResource(resName) match {
154           case Some(y) => query.put(ResourceEventNames.details,
155             new BasicDBObject(y.descriminatorField, instid.get))
156           case None =>
157         }
158       case None =>
159     }
160
161     val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
162     val cursor = resourceEvents.find(query).sort(sort)
163
164     try {
165       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
166       while(cursor.hasNext) {
167         buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
168       }
169       buffer.toList.sortWith(_sortByTimestampAsc)
170     } finally {
171       cursor.close()
172     }
173   }
174
175   def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = {
176     val query = new BasicDBObject()
177     query.put(ResourceEventNames.userID, userId)
178     query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis))
179     query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis))
180
181     // Sort them by increasing order for occurred time
182     val orderBy = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
183
184     MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None)
185   }
186   
187   def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
188     // FIXME: Implement
189     0L
190   }
191
192   def findAllRelevantResourceEventsForBillingPeriod(userId: String,
193                                                     startMillis: Long,
194                                                     stopMillis: Long): List[ResourceEvent] = {
195     // FIXME: Implement
196     Nil
197   }
198   //-ResourceEventStore
199
200   //+ UserStateStore
201   def insertUserState(userState: UserState) = {
202     MongoDBStore.insertUserState(userState, userStates, MongoDBStore.jsonSupportToDBObject)
203   }
204
205   def findUserStateByUserID(userID: String): Option[UserState] = {
206     val query = new BasicDBObject(UserStateJsonNames.userID, userID)
207     val cursor = userStates find query
208
209     withCloseable(cursor) { cursor ⇒
210       if(cursor.hasNext)
211         Some(MongoDBStore.dbObjectToUserState(cursor.next()))
212       else
213         None
214     }
215   }
216
217
218   def findLatestUserStateByUserID(userID: String) = {
219     // FIXME: implement
220     null
221   }
222
223   def findLatestUserStateForEndOfBillingMonth(userId: String,
224                                               yearOfBillingMonth: Int,
225                                               billingMonth: Int): Option[UserState] = {
226     None // FIXME: implement
227   }
228
229   def deleteUserState(userId: String) = {
230     val query = new BasicDBObject(UserStateJsonNames.userID, userId)
231     userStates.findAndRemove(query)
232   }
233   //- UserStateStore
234
235   //+WalletEntryStore
236   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
237     Maybe {
238       MongoDBStore.storeAny[WalletEntry](
239         entry,
240         walletEntries,
241         WalletJsonNames.id,
242         (e) => e.id,
243         MongoDBStore.jsonSupportToDBObject)
244     }
245   }
246
247   def findWalletEntryById(id: String): Maybe[WalletEntry] = {
248     MongoDBStore.findBy(WalletJsonNames.id, id, walletEntries, MongoDBStore.dbObjectToWalletEntry): Maybe[WalletEntry]
249   }
250
251   def findUserWalletEntries(userId: String) = {
252     // TODO: optimize
253     findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
254   }
255
256   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
257     val q = new BasicDBObject()
258     // TODO: Is this the correct way for an AND query?
259     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
260     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$lt", to.getTime))
261     q.put(WalletJsonNames.userId, userId)
262
263     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
264   }
265
266   def findWalletEntriesAfter(userId: String, from: Date) : List[WalletEntry] = {
267     val q = new BasicDBObject()
268     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
269     q.put(WalletJsonNames.userId, userId)
270
271     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
272   }
273
274   def findLatestUserWalletEntries(userId: String) = {
275     Maybe {
276       val orderBy = new BasicDBObject(WalletJsonNames.occurredMillis, -1) // -1 is descending order
277       val cursor = walletEntries.find().sort(orderBy)
278
279       try {
280         val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
281         if(cursor.hasNext) {
282           val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
283           buffer += walletEntry
284
285           var _previousOccurredMillis = walletEntry.occurredMillis
286           var _ok = true
287
288           while(cursor.hasNext && _ok) {
289             val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
290             var currentOccurredMillis = walletEntry.occurredMillis
291             _ok = currentOccurredMillis == _previousOccurredMillis
292             
293             if(_ok) {
294               buffer += walletEntry
295             }
296           }
297
298           buffer.toList
299         } else {
300           null
301         }
302       } finally {
303         cursor.close()
304       }
305     }
306   }
307
308   def findPreviousEntry(userId: String, resource: String,
309                         instanceId: String,
310                         finalized: Option[Boolean]): List[WalletEntry] = {
311     val q = new BasicDBObject()
312     q.put(WalletJsonNames.userId, userId)
313     q.put(WalletJsonNames.resource, resource)
314     q.put(WalletJsonNames.instanceId, instanceId)
315     finalized match {
316       case Some(x) => q.put(WalletJsonNames.finalized, x)
317       case None =>
318     }
319
320     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
321   }
322   //-WalletEntryStore
323
324   //+IMEventStore
325   def createIMEventFromJson(json: String) = {
326     MongoDBStore.createIMEventFromJson(json)
327   }
328
329   def createIMEventFromOther(event: IMEventModel) = {
330     MongoDBStore.createIMEventFromOther(event)
331   }
332
333   def insertIMEvent(event: IMEventModel): IMEvent = {
334     val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId().toStringMongod)
335     MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject)
336     localEvent
337   }
338
339   def findIMEventById(id: String): Option[IMEvent] = {
340     MongoDBStore.findBy(IMEventNames.id, id, imEvents, MongoDBIMEvent.fromDBObject)
341   }
342   //-IMEventStore
343
344   //+PolicyStore
345   def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = {
346     val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom,
347       new BasicDBObject("$gt", after))
348     MongoDBStore.runQuery(query, policyEntries)(MongoDBStore.dbObjectToPolicyEntry)(Some(_sortByTimestampAsc))
349   }
350
351   def storePolicyEntry(policy: PolicyEntry): Maybe[RecordID] = MongoDBStore.storePolicyEntry(policy, policyEntries)
352
353
354   def updatePolicyEntry(policy: PolicyEntry) = {
355     //Find the entry
356     val query = new BasicDBObject(PolicyEntry.JsonNames.id, policy.id)
357     val policyObject = MongoDBStore.jsonSupportToDBObject(policy)
358     policyEntries.update(query, policyObject, true, false)
359   }
360   
361   def findPolicyEntry(id: String) = {
362     MongoDBStore.findBy(PolicyJsonNames.id, id, policyEntries, MongoDBStore.dbObjectToPolicyEntry)
363   }
364
365   //-PolicyStore
366 }
367
368 object MongoDBStore {
369   object JsonNames {
370     final val _id = "_id"
371   }
372
373   /**
374    * Collection holding the [[gr.grnet.aquarium.event.resource.ResourceEventModel]]s.
375    *
376    * Resource events are coming from all systems handling billable resources.
377    */
378   final val RESOURCE_EVENTS_COLLECTION = "resevents"
379
380   /**
381    * Collection holding the snapshots of [[gr.grnet.aquarium.computation.UserState]].
382    *
383    * [[gr.grnet.aquarium.computation.UserState]] is held internally within [[gr.grnet.aquarium.actor.service.user.UserActor]]s.
384    */
385   final val USER_STATES_COLLECTION = "userstates"
386
387   /**
388    * Collection holding [[gr.grnet.aquarium.event.im.IMEventModel]]s.
389    *
390    * User events are coming from the IM module (external).
391    */
392   final val IM_EVENTS_COLLECTION = "imevents"
393
394   /**
395    * Collection holding [[gr.grnet.aquarium.event.im.IMEventModel]]s that could not be parsed to normal objects.
396    *
397    * We of course assume at least a valid JSON representation.
398    *
399    * User events are coming from the IM module (external).
400    */
401   final val UNPARSED_IM_EVENTS_COLLECTION = "unparsed_imevents"
402
403   /**
404    * Collection holding [[gr.grnet.aquarium.event.WalletEntry]].
405    *
406    * Wallet entries are generated internally in Aquarium.
407    */
408   final val WALLET_ENTRIES_COLLECTION = "wallets"
409
410   /**
411    * Collection holding [[gr.grnet.aquarium.logic.accounting.dsl.DSLPolicy]].
412    */
413 //  final val POLICIES_COLLECTION = "policies"
414
415   /**
416    * Collection holding [[gr.grnet.aquarium.event.PolicyEntry]].
417    */
418   final val POLICY_ENTRIES_COLLECTION = "policyEntries"
419
420   def dbObjectToUserState(dbObj: DBObject): UserState = {
421     UserState.fromJson(JSON.serialize(dbObj))
422   }
423
424   def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
425     WalletEntry.fromJson(JSON.serialize(dbObj))
426   }
427
428   def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
429     PolicyEntry.fromJson(JSON.serialize(dbObj))
430   }
431
432   def findBy[A >: Null <: AnyRef](name: String,
433                                   value: String,
434                                   collection: DBCollection,
435                                   deserializer: (DBObject) => A) : Option[A] = {
436     val query = new BasicDBObject(name, value)
437     val cursor = collection find query
438
439     withCloseable(cursor) { cursor ⇒
440       if(cursor.hasNext)
441         Some(deserializer apply cursor.next)
442       else
443         None
444     }
445   }
446
447   def runQuery[A <: ExternalEventModel](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
448                                   (deserializer: (DBObject) => A)
449                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
450     val cursor0 = collection find query
451     val cursor = if(orderBy ne null) {
452       cursor0 sort orderBy
453     } else {
454       cursor0
455     } // I really know that docs say that it is the same cursor.
456
457     if(!cursor.hasNext) {
458       cursor.close()
459       Nil
460     } else {
461       val buff = new ListBuffer[A]()
462
463       while(cursor.hasNext) {
464         buff += deserializer apply cursor.next
465       }
466
467       cursor.close()
468
469       sortWith match {
470         case Some(sorter) => buff.toList.sortWith(sorter)
471         case None => buff.toList
472       }
473     }
474   }
475
476   def storeUserState(userState: UserState, collection: DBCollection) = {
477     storeAny[UserState](userState, collection, ResourceEventNames.userID, _.userID, MongoDBStore.jsonSupportToDBObject)
478   }
479   
480   def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = {
481     Maybe(storeAny[PolicyEntry](policyEntry, collection, PolicyJsonNames.id, _.id, MongoDBStore.jsonSupportToDBObject))
482   }
483
484   def storeAny[A](any: A,
485                   collection: DBCollection,
486                   idName: String,
487                   idValueProvider: (A) => String,
488                   serializer: (A) => DBObject) : RecordID = {
489
490     val dbObject = serializer apply any
491     val _id = new ObjectId()
492     dbObject.put("_id", _id)
493     val writeResult = collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
494     writeResult.getLastError().throwOnError()
495
496     RecordID(dbObject.get("_id").toString)
497   }
498
499   // FIXME: consolidate
500   def insertUserState[A <: UserState](obj: A, collection: DBCollection, serializer: A ⇒ DBObject) = {
501     val dbObject = serializer apply obj
502     val objectId = obj._id  match {
503       case null ⇒
504         val _id = new ObjectId()
505         dbObject.put("_id", _id)
506         _id
507
508       case _id ⇒
509         _id
510     }
511
512     dbObject.put(JsonNames._id, objectId)
513
514     collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
515
516     obj
517   }
518
519   def insertObject[A <: MongoDBEventModel](obj: A, collection: DBCollection, serializer: (A) => DBObject) : Unit = {
520     val dbObject = serializer apply obj
521     val objectId = obj._id  match {
522       case null ⇒
523         val _id = new ObjectId()
524         dbObject.put("_id", _id)
525         _id
526
527       case _id ⇒
528         _id
529     }
530
531     dbObject.put(JsonNames._id, objectId)
532
533     collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
534   }
535
536   def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
537     Conversions.jsonSupportToDBObject(jsonSupport)
538   }
539
540   final def isLocalIMEvent(event: IMEventModel) = event match {
541     case _: MongoDBIMEvent ⇒ true
542     case _ ⇒ false
543   }
544
545   final def createIMEventFromJson(json: String) = {
546     MongoDBIMEvent.fromJsonString(json)
547   }
548
549   final def createIMEventFromOther(event: IMEventModel) = {
550     MongoDBIMEvent.fromOther(event, null)
551   }
552
553   final def createIMEventFromJsonBytes(jsonBytes: Array[Byte]) = {
554     MongoDBIMEvent.fromJsonBytes(jsonBytes)
555   }
556 }