WIP: IMEventModel end-to-end chain
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import com.mongodb.util.JSON
39 import gr.grnet.aquarium.user.UserState
40 import gr.grnet.aquarium.user.UserState.{JsonNames => UserStateJsonNames}
41 import gr.grnet.aquarium.util.json.JsonSupport
42 import collection.mutable.ListBuffer
43 import gr.grnet.aquarium.event.im.IMEventModel.{Names => IMEventNames}
44 import gr.grnet.aquarium.store._
45 import gr.grnet.aquarium.event.ResourceEvent.{JsonNames => ResourceJsonNames}
46 import gr.grnet.aquarium.event.WalletEntry.{JsonNames => WalletJsonNames}
47 import gr.grnet.aquarium.event.PolicyEntry.{JsonNames => PolicyJsonNames}
48 import java.util.Date
49 import gr.grnet.aquarium.logic.accounting.Policy
50 import com.mongodb._
51 import org.bson.types.ObjectId
52 import gr.grnet.aquarium.event._
53 import com.ckkloverdos.maybe.{NoVal, Maybe}
54 import im.IMEventModel
55 import gr.grnet.aquarium.util._
56 import gr.grnet.aquarium.converter.{Conversions, StdConverters}
57
58 /**
59  * Mongodb implementation of the various aquarium stores.
60  *
61  * @author Christos KK Loverdos <loverdos@gmail.com>
62  * @author Georgios Gousios <gousiosg@gmail.com>
63  */
64 class MongoDBStore(
65     val mongo: Mongo,
66     val database: String,
67     val username: String,
68     val password: String)
69   extends ResourceEventStore
70   with UserStateStore
71   with WalletEntryStore
72   with IMEventStore
73   with PolicyStore
74   with Loggable {
75
76   override type IMEvent = MongoDBIMEvent
77
78   private[store] lazy val resourceEvents   = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
79   private[store] lazy val userStates       = getCollection(MongoDBStore.USER_STATES_COLLECTION)
80   private[store] lazy val imEvents         = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
81   private[store] lazy val unparsedIMEvents = getCollection(MongoDBStore.UNPARSED_IM_EVENTS_COLLECTION)
82   private[store] lazy val walletEntries    = getCollection(MongoDBStore.WALLET_ENTRIES_COLLECTION)
83   private[store] lazy val policyEntries    = getCollection(MongoDBStore.POLICY_ENTRIES_COLLECTION)
84
85   private[this] def getCollection(name: String): DBCollection = {
86     val db = mongo.getDB(database)
87     //logger.debug("Authenticating to mongo")
88     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
89       throw new StoreException("Could not authenticate user %s".format(username))
90     }
91     db.getCollection(name)
92   }
93
94   private[this] def _sortByTimestampAsc[A <: AquariumEventModel](one: A, two: A): Boolean = {
95     if (one.occurredMillis > two.occurredMillis) false
96     else if (one.occurredMillis < two.occurredMillis) true
97     else true
98   }
99
100   private[this] def _sortByTimestampDesc[A <: AquariumEventSkeleton](one: A, two: A): Boolean = {
101     if (one.occurredMillis < two.occurredMillis) false
102     else if (one.occurredMillis > two.occurredMillis) true
103     else true
104   }
105
106   //+ResourceEventStore
107   def storeResourceEvent(event: ResourceEvent) = {
108     MongoDBStore.storeAny[ResourceEvent](
109       event,
110       resourceEvents,
111       ResourceJsonNames.id,
112       (e) => e.id,
113       MongoDBStore.jsonSupportToDBObject)
114   }
115
116   def findResourceEventById(id: String): Maybe[ResourceEvent] =
117     MongoDBStore.findById(id, resourceEvents, MongoDBStore.dbObjectToResourceEvent)
118
119   def findResourceEventsByUserId(userId: String)
120                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
121     val query = new BasicDBObject(ResourceJsonNames.userId, userId)
122
123     MongoDBStore.runQuery(query, resourceEvents)(MongoDBStore.dbObjectToResourceEvent)(sortWith)
124   }
125
126   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
127     val query = new BasicDBObject()
128     query.put(ResourceJsonNames.userId, userId)
129     query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$gt", timestamp))
130     
131     val sort = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
132
133     val cursor = resourceEvents.find(query).sort(sort)
134
135     try {
136       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
137       while(cursor.hasNext) {
138         buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
139       }
140       buffer.toList.sortWith(_sortByTimestampAsc)
141     } finally {
142       cursor.close()
143     }
144   }
145
146   def findResourceEventHistory(userId: String, resName: String,
147                                instid: Option[String], upTo: Long) : List[ResourceEvent] = {
148     val query = new BasicDBObject()
149     query.put(ResourceJsonNames.userId, userId)
150     query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$lt", upTo))
151     query.put(ResourceJsonNames.resource, resName)
152
153     instid match {
154       case Some(id) =>
155         Policy.policy.findResource(resName) match {
156           case Some(y) => query.put(ResourceJsonNames.details,
157             new BasicDBObject(y.descriminatorField, instid.get))
158           case None =>
159         }
160       case None =>
161     }
162
163     val sort = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
164     val cursor = resourceEvents.find(query).sort(sort)
165
166     try {
167       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
168       while(cursor.hasNext) {
169         buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
170       }
171       buffer.toList.sortWith(_sortByTimestampAsc)
172     } finally {
173       cursor.close()
174     }
175   }
176
177   def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = {
178     val query = new BasicDBObject()
179     query.put(ResourceJsonNames.userId, userId)
180     query.put(ResourceJsonNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis))
181     query.put(ResourceJsonNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis))
182
183     // Sort them by increasing order for occurred time
184     val orderBy = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
185
186     MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBStore.dbObjectToResourceEvent)(None)
187   }
188   
189   def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = {
190     Maybe {
191       // FIXME: Implement
192       0L
193     }
194   }
195
196   def findAllRelevantResourceEventsForBillingPeriod(userId: String,
197                                                     startMillis: Long,
198                                                     stopMillis: Long): List[ResourceEvent] = {
199     // FIXME: Implement
200     Nil
201   }
202   //-ResourceEventStore
203
204   //+ UserStateStore
205   def storeUserState(userState: UserState): Maybe[RecordID] = {
206     MongoDBStore.storeUserState(userState, userStates)
207   }
208
209   def findUserStateByUserId(userId: String): Maybe[UserState] = {
210     Maybe {
211       val query = new BasicDBObject(UserStateJsonNames.userId, userId)
212       val cursor = userStates find query
213
214       try {
215         if(cursor.hasNext)
216           MongoDBStore.dbObjectToUserState(cursor.next())
217         else
218           null
219       } finally {
220         cursor.close()
221       }
222     }
223   }
224
225   def findLatestUserStateForEndOfBillingMonth(userId: String,
226                                               yearOfBillingMonth: Int,
227                                               billingMonth: Int): Maybe[UserState] = {
228     NoVal // FIXME: implement
229   }
230
231   def deleteUserState(userId: String) = {
232     val query = new BasicDBObject(UserStateJsonNames.userId, userId)
233     userStates.findAndRemove(query)
234   }
235   //- UserStateStore
236
237   //+WalletEntryStore
238   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
239     Maybe {
240       MongoDBStore.storeAny[WalletEntry](
241         entry,
242         walletEntries,
243         ResourceJsonNames.id,
244         (e) => e.id,
245         MongoDBStore.jsonSupportToDBObject)
246     }
247   }
248
249   def findWalletEntryById(id: String): Maybe[WalletEntry] =
250     MongoDBStore.findById[WalletEntry](id, walletEntries, MongoDBStore.dbObjectToWalletEntry)
251
252   def findUserWalletEntries(userId: String) = {
253     // TODO: optimize
254     findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
255   }
256
257   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
258     val q = new BasicDBObject()
259     // TODO: Is this the correct way for an AND query?
260     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
261     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$lt", to.getTime))
262     q.put(WalletJsonNames.userId, userId)
263
264     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
265   }
266
267   def findWalletEntriesAfter(userId: String, from: Date) : List[WalletEntry] = {
268     val q = new BasicDBObject()
269     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
270     q.put(WalletJsonNames.userId, userId)
271
272     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
273   }
274
275   def findLatestUserWalletEntries(userId: String) = {
276     Maybe {
277       val orderBy = new BasicDBObject(WalletJsonNames.occurredMillis, -1) // -1 is descending order
278       val cursor = walletEntries.find().sort(orderBy)
279
280       try {
281         val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
282         if(cursor.hasNext) {
283           val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
284           buffer += walletEntry
285
286           var _previousOccurredMillis = walletEntry.occurredMillis
287           var _ok = true
288
289           while(cursor.hasNext && _ok) {
290             val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
291             var currentOccurredMillis = walletEntry.occurredMillis
292             _ok = currentOccurredMillis == _previousOccurredMillis
293             
294             if(_ok) {
295               buffer += walletEntry
296             }
297           }
298
299           buffer.toList
300         } else {
301           null
302         }
303       } finally {
304         cursor.close()
305       }
306     }
307   }
308
309   def findPreviousEntry(userId: String, resource: String,
310                         instanceId: String,
311                         finalized: Option[Boolean]): List[WalletEntry] = {
312     val q = new BasicDBObject()
313     q.put(WalletJsonNames.userId, userId)
314     q.put(WalletJsonNames.resource, resource)
315     q.put(WalletJsonNames.instanceId, instanceId)
316     finalized match {
317       case Some(x) => q.put(WalletJsonNames.finalized, x)
318       case None =>
319     }
320
321     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
322   }
323   //-WalletEntryStore
324
325   //+IMEventStore
326   def isLocalIMEvent(event: IMEventModel) = {
327     MongoDBStore.isLocalIMEvent(event)
328   }
329
330   def createIMEventFromJson(json: String) = {
331     MongoDBStore.createIMEventFromJson(json)
332   }
333
334   def createIMEventFromOther(event: IMEventModel) = {
335     MongoDBStore.createIMEventFromOther(event)
336   }
337
338 //  def storeUnparsed(json: String): Maybe[RecordID] = {
339 //    MongoDBStore.storeJustJson(json, unparsedIMEvents)
340 //  }
341
342   def insertIMEvent(event: IMEventModel): IMEvent = {
343     val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId())
344     MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject)
345     localEvent
346   }
347
348   def findIMEventById(id: String): Maybe[IMEvent] =
349     MongoDBStore.findById[IMEvent](id, imEvents, MongoDBStore.dbObjectToIMEvent)
350
351   def findIMEventsByUserId(userId: String): List[IMEvent] = {
352     val query = new BasicDBObject(IMEventNames.userID, userId)
353     MongoDBStore.runQuery(query, imEvents)(MongoDBStore.dbObjectToIMEvent)(Some(_sortByTimestampAsc))
354   }
355   //-IMEventStore
356
357   //+PolicyStore
358   def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = {
359     val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom,
360       new BasicDBObject("$gt", after))
361     MongoDBStore.runQuery(query, policyEntries)(MongoDBStore.dbObjectToPolicyEntry)(Some(_sortByTimestampAsc))
362   }
363
364   def storePolicyEntry(policy: PolicyEntry): Maybe[RecordID] = MongoDBStore.storePolicyEntry(policy, policyEntries)
365
366
367   def updatePolicyEntry(policy: PolicyEntry) = {
368     //Find the entry
369     val query = new BasicDBObject(PolicyEntry.JsonNames.id, policy.id)
370     val policyObject = MongoDBStore.jsonSupportToDBObject(policy)
371     policyEntries.update(query, policyObject, true, false)
372   }
373   
374   def findPolicyEntry(id: String) =
375     MongoDBStore.findById[PolicyEntry](id, policyEntries, MongoDBStore.dbObjectToPolicyEntry)
376
377   //-PolicyStore
378 }
379
380 object MongoDBStore {
381   object JsonNames {
382     final val _id = "_id"
383   }
384
385   /**
386    * Collection holding the [[gr.grnet.aquarium.event.ResourceEvent]]s.
387    *
388    * Resource events are coming from all systems handling billable resources.
389    */
390   final val RESOURCE_EVENTS_COLLECTION = "resevents"
391
392   /**
393    * Collection holding the snapshots of [[gr.grnet.aquarium.user.UserState]].
394    *
395    * [[gr.grnet.aquarium.user.UserState]] is held internally within [[gr.grnet.aquarium.actor.service.user .UserActor]]s.
396    */
397   final val USER_STATES_COLLECTION = "userstates"
398
399   /**
400    * Collection holding [[gr.grnet.aquarium.event.im.IMEventModel]]s.
401    *
402    * User events are coming from the IM module (external).
403    */
404   final val IM_EVENTS_COLLECTION = "imevents"
405
406   /**
407    * Collection holding [[gr.grnet.aquarium.event.im.IMEventModel]]s that could not be parsed to normal objects.
408    *
409    * We of course assume at least a valid JSON representation.
410    *
411    * User events are coming from the IM module (external).
412    */
413   final val UNPARSED_IM_EVENTS_COLLECTION = "unparsed_imevents"
414
415   /**
416    * Collection holding [[gr.grnet.aquarium.event.WalletEntry]].
417    *
418    * Wallet entries are generated internally in Aquarium.
419    */
420   final val WALLET_ENTRIES_COLLECTION = "wallets"
421
422   /**
423    * Collection holding [[gr.grnet.aquarium.logic.accounting.dsl.DSLPolicy]].
424    */
425 //  final val POLICIES_COLLECTION = "policies"
426
427   /**
428    * Collection holding [[gr.grnet.aquarium.event.PolicyEntry]].
429    */
430   final val POLICY_ENTRIES_COLLECTION = "policyEntries"
431
432   /* TODO: Some of the following methods rely on JSON (de-)serialization).
433   * A method based on proper object serialization would be much faster.
434   */
435   def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
436     ResourceEvent.fromJson(JSON.serialize(dbObject))
437   }
438
439   def dbObjectToUserState(dbObj: DBObject): UserState = {
440     UserState.fromJson(JSON.serialize(dbObj))
441   }
442
443   def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
444     WalletEntry.fromJson(JSON.serialize(dbObj))
445   }
446
447   def dbObjectToIMEvent(dbObj: DBObject): MongoDBIMEvent = {
448     MongoDBIMEvent.fromJson(JSON.serialize(dbObj))
449   }
450
451   def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
452     PolicyEntry.fromJson(JSON.serialize(dbObj))
453   }
454
455   def findById[A >: Null <: AnyRef](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] =
456     Maybe {
457     val query = new BasicDBObject(ResourceJsonNames.id, id)
458     val cursor = collection find query
459
460     try {
461       if(cursor.hasNext)
462         deserializer apply cursor.next
463       else
464         null: A // will be transformed to NoVal by the Maybe polymorphic constructor
465     } finally {
466       cursor.close()
467     }
468   }
469
470   def runQuery[A <: AquariumEventModel](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
471                                   (deserializer: (DBObject) => A)
472                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
473     val cursor0 = collection find query
474     val cursor = if(orderBy ne null) {
475       cursor0 sort orderBy
476     } else {
477       cursor0
478     } // I really know that docs say that it is the same cursor.
479
480     if(!cursor.hasNext) {
481       cursor.close()
482       Nil
483     } else {
484       val buff = new ListBuffer[A]()
485
486       while(cursor.hasNext) {
487         buff += deserializer apply cursor.next
488       }
489
490       cursor.close()
491
492       sortWith match {
493         case Some(sorter) => buff.toList.sortWith(sorter)
494         case None => buff.toList
495       }
496     }
497   }
498
499   def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
500     Maybe(storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject))
501   }
502   
503   def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = {
504     Maybe(storeAny[PolicyEntry](policyEntry, collection, PolicyJsonNames.id, _.id, MongoDBStore.jsonSupportToDBObject))
505   }
506
507   def storeAny[A](any: A,
508                   collection: DBCollection,
509                   idName: String,
510                   idValueProvider: (A) => String,
511                   serializer: (A) => DBObject) : RecordID = {
512
513     val dbObject = serializer apply any
514     val _id = new ObjectId()
515     dbObject.put("_id", _id)
516     val writeResult = collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
517     writeResult.getLastError().throwOnError()
518
519     RecordID(dbObject.get("_id").toString)
520   }
521
522   def insertObject[A <: MongoDBEventModel](obj: A, collection: DBCollection, serializer: (A) => DBObject) : ObjectId = {
523     val dbObject = serializer apply obj
524     val objectId = obj._id  match {
525       case null ⇒
526         val _id = new ObjectId()
527         dbObject.put("_id", _id)
528         _id
529
530       case _id ⇒
531         _id
532     }
533
534     dbObject.put(JsonNames._id, objectId)
535
536     val writeResult = collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
537     writeResult.getLastError().throwOnError()
538
539     objectId
540   }
541
542   def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
543     Conversions.jsonSupportToDBObject(jsonSupport)
544   }
545
546   final def isLocalIMEvent(event: IMEventModel) = event match {
547     case _: MongoDBIMEvent ⇒ true
548     case _ ⇒ false
549   }
550
551   final def createIMEventFromJson(json: String) = {
552     MongoDBIMEvent.fromJson(json)
553   }
554
555   final def createIMEventFromOther(event: IMEventModel) = {
556     MongoDBIMEvent.fromOther(event)
557   }
558
559   final def createIMEventFromJsonBytes(jsonBytes: Array[Byte]) = {
560     MongoDBIMEvent.fromJsonBytes(jsonBytes)
561   }
562 }