617471f11c1c1842edb96fe21bc5b2a07d572d42
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import com.mongodb.util.JSON
39 import gr.grnet.aquarium.computation.UserState.{JsonNames ⇒ UserStateJsonNames}
40 import gr.grnet.aquarium.util.json.JsonSupport
41 import collection.mutable.ListBuffer
42 import gr.grnet.aquarium.event.model.im.IMEventModel
43 import gr.grnet.aquarium.event.model.im.IMEventModel.{Names ⇒ IMEventNames}
44 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
45 import gr.grnet.aquarium.event.model.resource.ResourceEventModel.{Names ⇒ ResourceEventNames}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.event.model.PolicyEntry.{JsonNames ⇒ PolicyJsonNames}
48 import gr.grnet.aquarium.logic.accounting.Policy
49 import com.mongodb._
50 import org.bson.types.ObjectId
51 import com.ckkloverdos.maybe.Maybe
52 import gr.grnet.aquarium.util._
53 import gr.grnet.aquarium.converter.Conversions
54 import gr.grnet.aquarium.computation.UserState
55 import gr.grnet.aquarium.event.model.{ExternalEventModel, PolicyEntry}
56
57 /**
58  * Mongodb implementation of the various aquarium stores.
59  *
60  * @author Christos KK Loverdos <loverdos@gmail.com>
61  * @author Georgios Gousios <gousiosg@gmail.com>
62  */
63 class MongoDBStore(
64     val mongo: Mongo,
65     val database: String,
66     val username: String,
67     val password: String)
68   extends ResourceEventStore
69   with UserStateStore
70   with IMEventStore
71   with PolicyStore
72   with Loggable {
73
74   override type IMEvent = MongoDBIMEvent
75   override type ResourceEvent = MongoDBResourceEvent
76
77   private[store] lazy val resourceEvents   = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
78   private[store] lazy val userStates       = getCollection(MongoDBStore.USER_STATES_COLLECTION)
79   private[store] lazy val imEvents         = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
80   private[store] lazy val unparsedIMEvents = getCollection(MongoDBStore.UNPARSED_IM_EVENTS_COLLECTION)
81   private[store] lazy val policyEntries    = getCollection(MongoDBStore.POLICY_ENTRIES_COLLECTION)
82
83   private[this] def getCollection(name: String): DBCollection = {
84     val db = mongo.getDB(database)
85     //logger.debug("Authenticating to mongo")
86     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
87       throw new StoreException("Could not authenticate user %s".format(username))
88     }
89     db.getCollection(name)
90   }
91
92   private[this] def _sortByTimestampAsc[A <: ExternalEventModel](one: A, two: A): Boolean = {
93     if (one.occurredMillis > two.occurredMillis) false
94     else if (one.occurredMillis < two.occurredMillis) true
95     else true
96   }
97
98   //+ResourceEventStore
99   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
100     MongoDBResourceEvent.fromOther(event, null)
101   }
102
103   def pingResourceEventStore(): Unit = synchronized {
104     MongoDBStore.ping(mongo)
105   }
106
107   def insertResourceEvent(event: ResourceEventModel) = {
108     val localEvent = MongoDBResourceEvent.fromOther(event, new ObjectId().toStringMongod)
109     MongoDBStore.insertObject(localEvent, resourceEvents, MongoDBStore.jsonSupportToDBObject)
110     localEvent
111   }
112
113   def findResourceEventById(id: String): Option[ResourceEvent] = {
114     MongoDBStore.findBy(ResourceEventNames.id, id, resourceEvents, MongoDBResourceEvent.fromDBObject)
115   }
116
117   def findResourceEventsByUserId(userId: String)
118                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
119     val query = new BasicDBObject(ResourceEventNames.userID, userId)
120
121     MongoDBStore.runQuery(query, resourceEvents)(MongoDBResourceEvent.fromDBObject)(sortWith)
122   }
123
124   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
125     val query = new BasicDBObject()
126     query.put(ResourceEventNames.userID, userId)
127     query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$gt", timestamp))
128     
129     val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
130
131     val cursor = resourceEvents.find(query).sort(sort)
132
133     try {
134       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
135       while(cursor.hasNext) {
136         buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
137       }
138       buffer.toList.sortWith(_sortByTimestampAsc)
139     } finally {
140       cursor.close()
141     }
142   }
143
144   def findResourceEventHistory(userId: String, resName: String,
145                                instid: Option[String], upTo: Long) : List[ResourceEvent] = {
146     val query = new BasicDBObject()
147     query.put(ResourceEventNames.userID, userId)
148     query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$lt", upTo))
149     query.put(ResourceEventNames.resource, resName)
150
151     instid match {
152       case Some(id) =>
153         Policy.policy.findResource(resName) match {
154           case Some(y) => query.put(ResourceEventNames.details,
155             new BasicDBObject(y.descriminatorField, instid.get))
156           case None =>
157         }
158       case None =>
159     }
160
161     val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
162     val cursor = resourceEvents.find(query).sort(sort)
163
164     try {
165       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
166       while(cursor.hasNext) {
167         buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
168       }
169       buffer.toList.sortWith(_sortByTimestampAsc)
170     } finally {
171       cursor.close()
172     }
173   }
174
175   def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = {
176     val query = new BasicDBObject()
177     query.put(ResourceEventNames.userID, userId)
178     query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis))
179     query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis))
180
181     // Sort them by increasing order for occurred time
182     val orderBy = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
183
184     MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None)
185   }
186   
187   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
188     val query = new BasicDBObjectBuilder().
189       add(ResourceEventModel.Names.userID, userID).
190       // received within the period
191       add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$gte", startMillis)).
192       add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$lte", stopMillis)).
193       // occurred outside the period
194       add("$or", {
195         val dbList = new BasicDBList()
196         dbList.add(0, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$lt", startMillis)))
197         dbList.add(1, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$gt", stopMillis)))
198         dbList
199       }).
200       get()
201
202     resourceEvents.count(query)
203   }
204
205   def findAllRelevantResourceEventsForBillingPeriod(userId: String,
206                                                     startMillis: Long,
207                                                     stopMillis: Long): List[ResourceEvent] = {
208     // FIXME: Implement
209     Nil
210   }
211   //-ResourceEventStore
212
213   //+ UserStateStore
214   def insertUserState(userState: UserState) = {
215     MongoDBStore.insertUserState(
216       userState.copy(_id = new ObjectId().toString),
217       userStates,
218       MongoDBStore.jsonSupportToDBObject
219     )
220   }
221
222   def findUserStateByUserID(userID: String): Option[UserState] = {
223     val query = new BasicDBObject(UserStateJsonNames.userID, userID)
224     val cursor = userStates find query
225
226     MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
227   }
228
229   def findLatestUserStateForEndOfBillingMonth(userID: String,
230                                               yearOfBillingMonth: Int,
231                                               billingMonth: Int): Option[UserState] = {
232     val query = new BasicDBObjectBuilder().
233       add(UserState.JsonNames.userID, userID).
234       add(UserState.JsonNames.isFullBillingMonthState, true).
235       add(UserState.JsonNames.theFullBillingMonth.year, yearOfBillingMonth).
236       add(UserState.JsonNames.theFullBillingMonth.month, billingMonth).
237       get()
238
239     // Descending order, so that the latest comes first
240     val sorter = new BasicDBObject(UserState.JsonNames.occurredMillis, -1)
241
242     val cursor = userStates.find(query).sort(sorter)
243
244     MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
245   }
246
247   def deleteUserState(userId: String) = {
248     val query = new BasicDBObject(UserStateJsonNames.userID, userId)
249     userStates.findAndRemove(query)
250   }
251   //- UserStateStore
252
253   //+IMEventStore
254   def createIMEventFromJson(json: String) = {
255     MongoDBStore.createIMEventFromJson(json)
256   }
257
258   def createIMEventFromOther(event: IMEventModel) = {
259     MongoDBStore.createIMEventFromOther(event)
260   }
261
262   def pingIMEventStore(): Unit = {
263     MongoDBStore.ping(mongo)
264   }
265
266   def insertIMEvent(event: IMEventModel): IMEvent = {
267     val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId().toStringMongod)
268     MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject)
269     localEvent
270   }
271
272   def findIMEventById(id: String): Option[IMEvent] = {
273     MongoDBStore.findBy(IMEventNames.id, id, imEvents, MongoDBIMEvent.fromDBObject)
274   }
275
276
277   /**
278    * Find the `CREATE` even for the given user. Note that there must be only one such event.
279    */
280   def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
281     val query = new BasicDBObjectBuilder().
282       add(IMEventNames.userID, userID).
283       add(IMEventNames.eventType, IMEventModel.EventTypeNames.create).get()
284
285     // Normally one such event is allowed ...
286     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
287
288     MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
289   }
290
291   def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
292     val query = new BasicDBObject(IMEventNames.userID, userID)
293     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, -1))
294
295     MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
296   }
297
298   /**
299    * Find the very first activation event for a particular user.
300    *
301    */
302   def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent] = {
303     val query = new BasicDBObjectBuilder().
304       add(IMEventNames.userID, userID).
305       add(IMEventNames.isActive, true).get()
306
307     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
308
309     withCloseable(cursor) { cursor ⇒
310       if(cursor.hasNext) {
311         Some(MongoDBIMEvent.fromDBObject(cursor.next()))
312       } else {
313         None
314       }
315    }
316   }
317
318   /**
319    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
320    * the given function `f`.
321    *
322    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
323    */
324   def replayIMEventsInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
325     val query = new BasicDBObject(IMEventNames.userID, userID)
326     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
327
328     withCloseable(cursor) { cursor ⇒
329       while(cursor.hasNext) {
330         val model = MongoDBIMEvent.fromDBObject(cursor.next())
331         f(model)
332       }
333     }
334   }
335   //-IMEventStore
336
337
338
339   //+PolicyStore
340   def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = {
341     val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom,
342       new BasicDBObject("$gt", after))
343     MongoDBStore.runQuery(query, policyEntries)(MongoDBStore.dbObjectToPolicyEntry)(Some(_sortByTimestampAsc))
344   }
345
346   def storePolicyEntry(policy: PolicyEntry): Maybe[RecordID] = MongoDBStore.storePolicyEntry(policy, policyEntries)
347
348
349   def updatePolicyEntry(policy: PolicyEntry) = {
350     //Find the entry
351     val query = new BasicDBObject(PolicyEntry.JsonNames.id, policy.id)
352     val policyObject = MongoDBStore.jsonSupportToDBObject(policy)
353     policyEntries.update(query, policyObject, true, false)
354   }
355   
356   def findPolicyEntry(id: String) = {
357     MongoDBStore.findBy(PolicyJsonNames.id, id, policyEntries, MongoDBStore.dbObjectToPolicyEntry)
358   }
359
360   //-PolicyStore
361 }
362
363 object MongoDBStore {
364   object JsonNames {
365     final val _id = "_id"
366   }
367
368   /**
369    * Collection holding the [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]s.
370    *
371    * Resource events are coming from all systems handling billable resources.
372    */
373   final val RESOURCE_EVENTS_COLLECTION = "resevents"
374
375   /**
376    * Collection holding the snapshots of [[gr.grnet.aquarium.computation.UserState]].
377    *
378    * [[gr.grnet.aquarium.computation.UserState]] is held internally within [[gr.grnet.aquarium.actor.service.user.UserActor]]s.
379    */
380   final val USER_STATES_COLLECTION = "userstates"
381
382   /**
383    * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
384    *
385    * User events are coming from the IM module (external).
386    */
387   final val IM_EVENTS_COLLECTION = "imevents"
388
389   /**
390    * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s that could not be parsed to normal objects.
391    *
392    * We of course assume at least a valid JSON representation.
393    *
394    * User events are coming from the IM module (external).
395    */
396   final val UNPARSED_IM_EVENTS_COLLECTION = "unparsed_imevents"
397
398   /**
399    * Collection holding [[gr.grnet.aquarium.logic.accounting.dsl.DSLPolicy]].
400    */
401 //  final val POLICIES_COLLECTION = "policies"
402
403   /**
404    * Collection holding [[gr.grnet.aquarium.event.model.PolicyEntry]].
405    */
406   final val POLICY_ENTRIES_COLLECTION = "policyEntries"
407
408   def dbObjectToUserState(dbObj: DBObject): UserState = {
409     UserState.fromJson(JSON.serialize(dbObj))
410   }
411
412   def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
413     withCloseable(cursor) { cursor ⇒
414       if(cursor.hasNext) {
415         Some(f(cursor.next()))
416       } else {
417         None
418       }
419     }
420   }
421
422   def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
423     PolicyEntry.fromJson(JSON.serialize(dbObj))
424   }
425
426   def ping(mongo: Mongo): Unit = synchronized {
427     // This requires a network roundtrip
428     mongo.isLocked
429   }
430
431   def findBy[A >: Null <: AnyRef](name: String,
432                                   value: String,
433                                   collection: DBCollection,
434                                   deserializer: (DBObject) => A) : Option[A] = {
435     val query = new BasicDBObject(name, value)
436     val cursor = collection find query
437
438     withCloseable(cursor) { cursor ⇒
439       if(cursor.hasNext)
440         Some(deserializer apply cursor.next)
441       else
442         None
443     }
444   }
445
446   def runQuery[A <: ExternalEventModel](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
447                                   (deserializer: (DBObject) => A)
448                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
449     val cursor0 = collection find query
450     val cursor = if(orderBy ne null) {
451       cursor0 sort orderBy
452     } else {
453       cursor0
454     } // I really know that docs say that it is the same cursor.
455
456     if(!cursor.hasNext) {
457       cursor.close()
458       Nil
459     } else {
460       val buff = new ListBuffer[A]()
461
462       while(cursor.hasNext) {
463         buff += deserializer apply cursor.next
464       }
465
466       cursor.close()
467
468       sortWith match {
469         case Some(sorter) => buff.toList.sortWith(sorter)
470         case None => buff.toList
471       }
472     }
473   }
474
475   def storeUserState(userState: UserState, collection: DBCollection) = {
476     storeAny[UserState](userState, collection, ResourceEventNames.userID, _.userID, MongoDBStore.jsonSupportToDBObject)
477   }
478   
479   def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = {
480     Maybe(storeAny[PolicyEntry](policyEntry, collection, PolicyJsonNames.id, _.id, MongoDBStore.jsonSupportToDBObject))
481   }
482
483   def storeAny[A](any: A,
484                   collection: DBCollection,
485                   idName: String,
486                   idValueProvider: (A) => String,
487                   serializer: (A) => DBObject) : RecordID = {
488
489     val dbObject = serializer apply any
490     val _id = new ObjectId()
491     dbObject.put("_id", _id)
492     val writeResult = collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
493     writeResult.getLastError().throwOnError()
494
495     RecordID(dbObject.get("_id").toString)
496   }
497
498   // FIXME: consolidate
499   def insertUserState[A <: UserState](obj: A, collection: DBCollection, serializer: A ⇒ DBObject) = {
500     val dbObject = serializer apply obj
501     val objectId = obj._id  match {
502       case null ⇒
503         val _id = new ObjectId()
504         dbObject.put("_id", _id)
505         _id
506
507       case _id ⇒
508         _id
509     }
510
511     dbObject.put(JsonNames._id, objectId)
512
513     collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
514
515     obj
516   }
517
518   def insertObject[A <: MongoDBEventModel](obj: A, collection: DBCollection, serializer: (A) => DBObject) : Unit = {
519     val dbObject = serializer apply obj
520     val objectId = obj._id  match {
521       case null ⇒
522         val _id = new ObjectId()
523         dbObject.put("_id", _id)
524         _id
525
526       case _id ⇒
527         _id
528     }
529
530     dbObject.put(JsonNames._id, objectId)
531
532     collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
533   }
534
535   def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
536     Conversions.jsonSupportToDBObject(jsonSupport)
537   }
538
539   final def isLocalIMEvent(event: IMEventModel) = event match {
540     case _: MongoDBIMEvent ⇒ true
541     case _ ⇒ false
542   }
543
544   final def createIMEventFromJson(json: String) = {
545     MongoDBIMEvent.fromJsonString(json)
546   }
547
548   final def createIMEventFromOther(event: IMEventModel) = {
549     MongoDBIMEvent.fromOther(event, null)
550   }
551
552   final def createIMEventFromJsonBytes(jsonBytes: Array[Byte]) = {
553     MongoDBIMEvent.fromJsonBytes(jsonBytes)
554   }
555 }