WIP: Remodeling events
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import com.mongodb.util.JSON
39 import gr.grnet.aquarium.user.UserState
40 import gr.grnet.aquarium.user.UserState.{JsonNames => UserStateJsonNames}
41 import gr.grnet.aquarium.util.json.JsonSupport
42 import collection.mutable.ListBuffer
43 import gr.grnet.aquarium.events.im.IMEventModel.{Names => IMEventNames}
44 import gr.grnet.aquarium.store._
45 import gr.grnet.aquarium.events.ResourceEvent.{JsonNames => ResourceJsonNames}
46 import gr.grnet.aquarium.events.WalletEntry.{JsonNames => WalletJsonNames}
47 import gr.grnet.aquarium.events.PolicyEntry.{JsonNames => PolicyJsonNames}
48 import java.util.Date
49 import gr.grnet.aquarium.logic.accounting.Policy
50 import com.mongodb._
51 import org.bson.types.ObjectId
52 import gr.grnet.aquarium.events._
53 import com.ckkloverdos.maybe.{NoVal, Maybe}
54 import im.IMEventModel
55 import gr.grnet.aquarium.util._
56 import gr.grnet.aquarium.converter.StdConverters
57
58 /**
59  * Mongodb implementation of the various aquarium stores.
60  *
61  * @author Christos KK Loverdos <loverdos@gmail.com>
62  * @author Georgios Gousios <gousiosg@gmail.com>
63  */
64 class MongoDBStore(
65     val mongo: Mongo,
66     val database: String,
67     val username: String,
68     val password: String)
69   extends ResourceEventStore
70   with UserStateStore
71   with WalletEntryStore
72   with IMEventStore
73   with PolicyStore
74   with Loggable {
75
76   override type IMEvent = MongoDBIMEvent
77
78   private[store] lazy val resourceEvents   = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
79   private[store] lazy val userStates       = getCollection(MongoDBStore.USER_STATES_COLLECTION)
80   private[store] lazy val imEvents         = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
81   private[store] lazy val unparsedIMEvents = getCollection(MongoDBStore.UNPARSED_IM_EVENTS_COLLECTION)
82   private[store] lazy val walletEntries    = getCollection(MongoDBStore.WALLET_ENTRIES_COLLECTION)
83   private[store] lazy val policyEntries    = getCollection(MongoDBStore.POLICY_ENTRIES_COLLECTION)
84
85   private[this] def getCollection(name: String): DBCollection = {
86     val db = mongo.getDB(database)
87     //logger.debug("Authenticating to mongo")
88     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
89       throw new StoreException("Could not authenticate user %s".format(username))
90     }
91     db.getCollection(name)
92   }
93
94   private[this] def _sortByTimestampAsc[A <: AquariumEventModel](one: A, two: A): Boolean = {
95     if (one.occurredMillis > two.occurredMillis) false
96     else if (one.occurredMillis < two.occurredMillis) true
97     else true
98   }
99
100   private[this] def _sortByTimestampDesc[A <: AquariumEventSkeleton](one: A, two: A): Boolean = {
101     if (one.occurredMillis < two.occurredMillis) false
102     else if (one.occurredMillis > two.occurredMillis) true
103     else true
104   }
105
106   //+ResourceEventStore
107   def storeResourceEvent(event: ResourceEvent) = {
108     MongoDBStore.storeAny[ResourceEvent](
109       event,
110       resourceEvents,
111       ResourceJsonNames.id,
112       (e) => e.id,
113       MongoDBStore.jsonSupportToDBObject)
114   }
115
116   def findResourceEventById(id: String): Maybe[ResourceEvent] =
117     MongoDBStore.findById(id, resourceEvents, MongoDBStore.dbObjectToResourceEvent)
118
119   def findResourceEventsByUserId(userId: String)
120                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
121     val query = new BasicDBObject(ResourceJsonNames.userId, userId)
122
123     MongoDBStore.runQuery(query, resourceEvents)(MongoDBStore.dbObjectToResourceEvent)(sortWith)
124   }
125
126   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
127     val query = new BasicDBObject()
128     query.put(ResourceJsonNames.userId, userId)
129     query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$gt", timestamp))
130     
131     val sort = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
132
133     val cursor = resourceEvents.find(query).sort(sort)
134
135     try {
136       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
137       while(cursor.hasNext) {
138         buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
139       }
140       buffer.toList.sortWith(_sortByTimestampAsc)
141     } finally {
142       cursor.close()
143     }
144   }
145
146   def findResourceEventHistory(userId: String, resName: String,
147                                instid: Option[String], upTo: Long) : List[ResourceEvent] = {
148     val query = new BasicDBObject()
149     query.put(ResourceJsonNames.userId, userId)
150     query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$lt", upTo))
151     query.put(ResourceJsonNames.resource, resName)
152
153     instid match {
154       case Some(id) =>
155         Policy.policy.findResource(resName) match {
156           case Some(y) => query.put(ResourceJsonNames.details,
157             new BasicDBObject(y.descriminatorField, instid.get))
158           case None =>
159         }
160       case None =>
161     }
162
163     val sort = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
164     val cursor = resourceEvents.find(query).sort(sort)
165
166     try {
167       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
168       while(cursor.hasNext) {
169         buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
170       }
171       buffer.toList.sortWith(_sortByTimestampAsc)
172     } finally {
173       cursor.close()
174     }
175   }
176
177   def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = {
178     val query = new BasicDBObject()
179     query.put(ResourceJsonNames.userId, userId)
180     query.put(ResourceJsonNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis))
181     query.put(ResourceJsonNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis))
182
183     // Sort them by increasing order for occurred time
184     val orderBy = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
185
186     MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBStore.dbObjectToResourceEvent)(None)
187   }
188   
189   def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = {
190     Maybe {
191       // FIXME: Implement
192       0L
193     }
194   }
195
196   def findAllRelevantResourceEventsForBillingPeriod(userId: String,
197                                                     startMillis: Long,
198                                                     stopMillis: Long): List[ResourceEvent] = {
199     // FIXME: Implement
200     Nil
201   }
202   //-ResourceEventStore
203
204   //+ UserStateStore
205   def storeUserState(userState: UserState): Maybe[RecordID] = {
206     MongoDBStore.storeUserState(userState, userStates)
207   }
208
209   def findUserStateByUserId(userId: String): Maybe[UserState] = {
210     Maybe {
211       val query = new BasicDBObject(UserStateJsonNames.userId, userId)
212       val cursor = userStates find query
213
214       try {
215         if(cursor.hasNext)
216           MongoDBStore.dbObjectToUserState(cursor.next())
217         else
218           null
219       } finally {
220         cursor.close()
221       }
222     }
223   }
224
225   def findLatestUserStateForEndOfBillingMonth(userId: String,
226                                               yearOfBillingMonth: Int,
227                                               billingMonth: Int): Maybe[UserState] = {
228     NoVal // FIXME: implement
229   }
230
231   def deleteUserState(userId: String) = {
232     val query = new BasicDBObject(UserStateJsonNames.userId, userId)
233     userStates.findAndRemove(query)
234   }
235   //- UserStateStore
236
237   //+WalletEntryStore
238   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
239     Maybe {
240       MongoDBStore.storeAny[WalletEntry](
241         entry,
242         walletEntries,
243         ResourceJsonNames.id,
244         (e) => e.id,
245         MongoDBStore.jsonSupportToDBObject)
246     }
247   }
248
249   def findWalletEntryById(id: String): Maybe[WalletEntry] =
250     MongoDBStore.findById[WalletEntry](id, walletEntries, MongoDBStore.dbObjectToWalletEntry)
251
252   def findUserWalletEntries(userId: String) = {
253     // TODO: optimize
254     findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
255   }
256
257   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
258     val q = new BasicDBObject()
259     // TODO: Is this the correct way for an AND query?
260     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
261     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$lt", to.getTime))
262     q.put(WalletJsonNames.userId, userId)
263
264     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
265   }
266
267   def findWalletEntriesAfter(userId: String, from: Date) : List[WalletEntry] = {
268     val q = new BasicDBObject()
269     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
270     q.put(WalletJsonNames.userId, userId)
271
272     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
273   }
274
275   def findLatestUserWalletEntries(userId: String) = {
276     Maybe {
277       val orderBy = new BasicDBObject(WalletJsonNames.occurredMillis, -1) // -1 is descending order
278       val cursor = walletEntries.find().sort(orderBy)
279
280       try {
281         val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
282         if(cursor.hasNext) {
283           val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
284           buffer += walletEntry
285
286           var _previousOccurredMillis = walletEntry.occurredMillis
287           var _ok = true
288
289           while(cursor.hasNext && _ok) {
290             val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
291             var currentOccurredMillis = walletEntry.occurredMillis
292             _ok = currentOccurredMillis == _previousOccurredMillis
293             
294             if(_ok) {
295               buffer += walletEntry
296             }
297           }
298
299           buffer.toList
300         } else {
301           null
302         }
303       } finally {
304         cursor.close()
305       }
306     }
307   }
308
309   def findPreviousEntry(userId: String, resource: String,
310                         instanceId: String,
311                         finalized: Option[Boolean]): List[WalletEntry] = {
312     val q = new BasicDBObject()
313     q.put(WalletJsonNames.userId, userId)
314     q.put(WalletJsonNames.resource, resource)
315     q.put(WalletJsonNames.instanceId, instanceId)
316     finalized match {
317       case Some(x) => q.put(WalletJsonNames.finalized, x)
318       case None =>
319     }
320
321     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
322   }
323   //-WalletEntryStore
324
325   //+IMEventStore
326   def isLocalIMEvent(event: IMEventModel) = {
327     MongoDBStore.isLocalIMEvent(event)
328   }
329
330   def createIMEventFromJson(json: String) = {
331     MongoDBStore.createIMEventFromJson(json)
332   }
333
334   def createIMEventFromOther(event: IMEventModel) = {
335     MongoDBStore.createIMEventFromOther(event)
336   }
337
338   def storeUnparsed(json: String): Maybe[RecordID] = {
339     MongoDBStore.storeJustJson(json, unparsedIMEvents)
340   }
341
342   def storeIMEvent(_event: IMEventModel): RecordID = {
343     val event = createIMEventFromOther(_event)
344     MongoDBStore.storeAny[IMEvent](
345       event,
346       imEvents,
347       IMEventNames.userID,
348       _.userID,
349       MongoDBStore.jsonSupportToDBObject
350     )
351   }
352
353   def findIMEventById(id: String): Maybe[IMEvent] =
354     MongoDBStore.findById[IMEvent](id, imEvents, MongoDBStore.dbObjectToIMEvent)
355
356   def findIMEventsByUserId(userId: String): List[IMEvent] = {
357     val query = new BasicDBObject(IMEventNames.userID, userId)
358     MongoDBStore.runQuery(query, imEvents)(MongoDBStore.dbObjectToIMEvent)(Some(_sortByTimestampAsc))
359   }
360   //-IMEventStore
361
362   //+PolicyStore
363   def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = {
364     val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom,
365       new BasicDBObject("$gt", after))
366     MongoDBStore.runQuery(query, policyEntries)(MongoDBStore.dbObjectToPolicyEntry)(Some(_sortByTimestampAsc))
367   }
368
369   def storePolicyEntry(policy: PolicyEntry): Maybe[RecordID] = MongoDBStore.storePolicyEntry(policy, policyEntries)
370
371
372   def updatePolicyEntry(policy: PolicyEntry) = {
373     //Find the entry
374     val query = new BasicDBObject(PolicyEntry.JsonNames.id, policy.id)
375     val policyObject = MongoDBStore.jsonSupportToDBObject(policy)
376     policyEntries.update(query, policyObject, true, false)
377   }
378   
379   def findPolicyEntry(id: String) =
380     MongoDBStore.findById[PolicyEntry](id, policyEntries, MongoDBStore.dbObjectToPolicyEntry)
381
382   //-PolicyStore
383 }
384
385 object MongoDBStore {
386   object JsonNames {
387     final val _id = "_id"
388   }
389
390   /**
391    * Collection holding the [[gr.grnet.aquarium.events.ResourceEvent]]s.
392    *
393    * Resource events are coming from all systems handling billable resources.
394    */
395   final val RESOURCE_EVENTS_COLLECTION = "resevents"
396
397   /**
398    * Collection holding the snapshots of [[gr.grnet.aquarium.user.UserState]].
399    *
400    * [[gr.grnet.aquarium.user.UserState]] is held internally within [[gr.grnet.aquarium.actor.service.user .UserActor]]s.
401    */
402   final val USER_STATES_COLLECTION = "userstates"
403
404   /**
405    * Collection holding [[gr.grnet.aquarium.events.im.IMEventModel]]s.
406    *
407    * User events are coming from the IM module (external).
408    */
409   final val IM_EVENTS_COLLECTION = "imevents"
410
411   /**
412    * Collection holding [[gr.grnet.aquarium.events.im.IMEventModel]]s that could not be parsed to normal objects.
413    *
414    * We of course assume at least a valid JSON representation.
415    *
416    * User events are coming from the IM module (external).
417    */
418   final val UNPARSED_IM_EVENTS_COLLECTION = "unparsed_imevents"
419
420   /**
421    * Collection holding [[gr.grnet.aquarium.events.WalletEntry]].
422    *
423    * Wallet entries are generated internally in Aquarium.
424    */
425   final val WALLET_ENTRIES_COLLECTION = "wallets"
426
427   /**
428    * Collection holding [[gr.grnet.aquarium.logic.accounting.dsl.DSLPolicy]].
429    */
430 //  final val POLICIES_COLLECTION = "policies"
431
432   /**
433    * Collection holding [[gr.grnet.aquarium.events.PolicyEntry]].
434    */
435   final val POLICY_ENTRIES_COLLECTION = "policyEntries"
436
437   /* TODO: Some of the following methods rely on JSON (de-)serialization).
438   * A method based on proper object serialization would be much faster.
439   */
440   def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
441     ResourceEvent.fromJson(JSON.serialize(dbObject))
442   }
443
444   def dbObjectToUserState(dbObj: DBObject): UserState = {
445     UserState.fromJson(JSON.serialize(dbObj))
446   }
447
448   def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
449     WalletEntry.fromJson(JSON.serialize(dbObj))
450   }
451
452   def dbObjectToIMEvent(dbObj: DBObject): MongoDBIMEvent = {
453     MongoDBIMEvent.fromJson(JSON.serialize(dbObj))
454   }
455
456   def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
457     PolicyEntry.fromJson(JSON.serialize(dbObj))
458   }
459
460   def findById[A >: Null <: AnyRef](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] =
461     Maybe {
462     val query = new BasicDBObject(ResourceJsonNames.id, id)
463     val cursor = collection find query
464
465     try {
466       if(cursor.hasNext)
467         deserializer apply cursor.next
468       else
469         null: A // will be transformed to NoVal by the Maybe polymorphic constructor
470     } finally {
471       cursor.close()
472     }
473   }
474
475   def runQuery[A <: AquariumEventModel](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
476                                   (deserializer: (DBObject) => A)
477                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
478     val cursor0 = collection find query
479     val cursor = if(orderBy ne null) {
480       cursor0 sort orderBy
481     } else {
482       cursor0
483     } // I really know that docs say that it is the same cursor.
484
485     if(!cursor.hasNext) {
486       cursor.close()
487       Nil
488     } else {
489       val buff = new ListBuffer[A]()
490
491       while(cursor.hasNext) {
492         buff += deserializer apply cursor.next
493       }
494
495       cursor.close()
496
497       sortWith match {
498         case Some(sorter) => buff.toList.sortWith(sorter)
499         case None => buff.toList
500       }
501     }
502   }
503
504   def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
505     Maybe(storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject))
506   }
507   
508   def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = {
509     Maybe(storeAny[PolicyEntry](policyEntry, collection, PolicyJsonNames.id, _.id, MongoDBStore.jsonSupportToDBObject))
510   }
511
512   def storeJustJson(json: String, collection: DBCollection): Maybe[RecordID] = {
513     Maybe {
514       val dbObj = jsonStringToDBObject(json)
515       val writeResult = collection insert dbObj
516       writeResult.getLastError().throwOnError()
517       val objectId = dbObj.get("_id").asInstanceOf[ObjectId]
518
519       RecordID(objectId.toString)
520     }
521   }
522
523   def storeAny[A](any: A,
524                   collection: DBCollection,
525                   idName: String,
526                   idValueProvider: (A) => String,
527                   serializer: (A) => DBObject) : RecordID = {
528
529     val dbObject = serializer apply any
530     val _id = new ObjectId()
531     dbObject.put("_id", _id)
532     val writeResult = collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
533     writeResult.getLastError().throwOnError()
534
535     RecordID(dbObject.get("_id").toString)
536   }
537
538   def jsonSupportToDBObject(jsonSupport: JsonSupport): DBObject = {
539     StdConverters.StdConverters.convertEx[DBObject](jsonSupport)
540   }
541
542   def jsonStringToDBObject(jsonString: String): DBObject = {
543     StdConverters.StdConverters.convertEx[DBObject](jsonString)
544   }
545
546   final def isLocalIMEvent(event: IMEventModel) = event match {
547     case _: MongoDBIMEvent ⇒ true
548     case _ ⇒ false
549   }
550
551   final def createIMEventFromJson(json: String) = {
552     MongoDBIMEvent.fromJson(json)
553   }
554
555   final def createIMEventFromOther(event: IMEventModel) = {
556     MongoDBIMEvent.fromOther(event)
557   }
558
559   final def createIMEventFromJsonBytes(jsonBytes: Array[Byte]) = {
560     MongoDBIMEvent.fromJsonBytes(jsonBytes)
561   }
562 }