04ecf59ce90ffdc4600195c4db566c30a7d70272
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import com.mongodb.util.JSON
39 import gr.grnet.aquarium.computation.state.UserState.{JsonNames ⇒ UserStateJsonNames}
40 import gr.grnet.aquarium.util.json.JsonSupport
41 import collection.mutable.ListBuffer
42 import gr.grnet.aquarium.event.model.im.IMEventModel
43 import gr.grnet.aquarium.event.model.im.IMEventModel.{Names ⇒ IMEventNames}
44 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
45 import gr.grnet.aquarium.event.model.resource.ResourceEventModel.{Names ⇒ ResourceEventNames}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.event.model.PolicyEntry.{JsonNames ⇒ PolicyJsonNames}
48 import gr.grnet.aquarium.logic.accounting.Policy
49 import com.mongodb._
50 import org.bson.types.ObjectId
51 import com.ckkloverdos.maybe.Maybe
52 import gr.grnet.aquarium.util._
53 import gr.grnet.aquarium.converter.Conversions
54 import gr.grnet.aquarium.computation.state.UserState
55 import gr.grnet.aquarium.event.model.{ExternalEventModel, PolicyEntry}
56 import gr.grnet.aquarium.computation.BillingMonthInfo
57 import gr.grnet.aquarium.Aquarium
58
59 /**
60  * Mongodb implementation of the various aquarium stores.
61  *
62  * @author Christos KK Loverdos <loverdos@gmail.com>
63  * @author Georgios Gousios <gousiosg@gmail.com>
64  */
65 class MongoDBStore(
66     val mongo: Mongo,
67     val database: String,
68     val username: String,
69     val password: String)
70   extends ResourceEventStore
71   with UserStateStore
72   with IMEventStore
73   with PolicyStore
74   with Loggable {
75
76   override type IMEvent = MongoDBIMEvent
77   override type ResourceEvent = MongoDBResourceEvent
78
79   private[store] lazy val resourceEvents   = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
80   private[store] lazy val userStates       = getCollection(MongoDBStore.USER_STATES_COLLECTION)
81   private[store] lazy val imEvents         = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
82   private[store] lazy val unparsedIMEvents = getCollection(MongoDBStore.UNPARSED_IM_EVENTS_COLLECTION)
83   private[store] lazy val policyEntries    = getCollection(MongoDBStore.POLICY_ENTRIES_COLLECTION)
84
85   private[this] def getCollection(name: String): DBCollection = {
86     val db = mongo.getDB(database)
87     //logger.debug("Authenticating to mongo")
88     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
89       throw new StoreException("Could not authenticate user %s".format(username))
90     }
91     db.getCollection(name)
92   }
93
94   private[this] def _sortByTimestampAsc[A <: ExternalEventModel](one: A, two: A): Boolean = {
95     if (one.occurredMillis > two.occurredMillis) false
96     else if (one.occurredMillis < two.occurredMillis) true
97     else true
98   }
99
100   //+ResourceEventStore
101   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
102     MongoDBResourceEvent.fromOther(event, null)
103   }
104
105   def pingResourceEventStore(): Unit = synchronized {
106     MongoDBStore.ping(mongo)
107   }
108
109   def insertResourceEvent(event: ResourceEventModel) = {
110     val localEvent = MongoDBResourceEvent.fromOther(event, new ObjectId().toStringMongod)
111     MongoDBStore.insertObject(localEvent, resourceEvents, MongoDBStore.jsonSupportToDBObject)
112     localEvent
113   }
114
115   def findResourceEventById(id: String): Option[ResourceEvent] = {
116     MongoDBStore.findBy(ResourceEventNames.id, id, resourceEvents, MongoDBResourceEvent.fromDBObject)
117   }
118
119   def findResourceEventsByUserId(userId: String)
120                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
121     val query = new BasicDBObject(ResourceEventNames.userID, userId)
122
123     MongoDBStore.runQuery(query, resourceEvents)(MongoDBResourceEvent.fromDBObject)(sortWith)
124   }
125
126   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
127     val query = new BasicDBObject()
128     query.put(ResourceEventNames.userID, userId)
129     query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$gt", timestamp))
130     
131     val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
132
133     val cursor = resourceEvents.find(query).sort(sort)
134
135     try {
136       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
137       while(cursor.hasNext) {
138         buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
139       }
140       buffer.toList.sortWith(_sortByTimestampAsc)
141     } finally {
142       cursor.close()
143     }
144   }
145
146   def findResourceEventHistory(userId: String, resName: String,
147                                instid: Option[String], upTo: Long) : List[ResourceEvent] = {
148     val query = new BasicDBObject()
149     query.put(ResourceEventNames.userID, userId)
150     query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$lt", upTo))
151     query.put(ResourceEventNames.resource, resName)
152
153     instid match {
154       case Some(id) =>
155         Policy.policy.findResource(resName) match {
156           case Some(y) => query.put(ResourceEventNames.details,
157             new BasicDBObject(y.descriminatorField, instid.get))
158           case None =>
159         }
160       case None =>
161     }
162
163     val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
164     val cursor = resourceEvents.find(query).sort(sort)
165
166     try {
167       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
168       while(cursor.hasNext) {
169         buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
170       }
171       buffer.toList.sortWith(_sortByTimestampAsc)
172     } finally {
173       cursor.close()
174     }
175   }
176
177   def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = {
178     val query = new BasicDBObject()
179     query.put(ResourceEventNames.userID, userId)
180     query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis))
181     query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis))
182
183     // Sort them by increasing order for occurred time
184     val orderBy = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
185
186     MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None)
187   }
188   
189   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
190     val query = new BasicDBObjectBuilder().
191       add(ResourceEventModel.Names.userID, userID).
192       // received within the period
193       add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$gte", startMillis)).
194       add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$lte", stopMillis)).
195       // occurred outside the period
196       add("$or", {
197         val dbList = new BasicDBList()
198         dbList.add(0, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$lt", startMillis)))
199         dbList.add(1, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$gt", stopMillis)))
200         dbList
201       }).
202       get()
203
204     resourceEvents.count(query)
205   }
206
207   def findAllRelevantResourceEventsForBillingPeriod(userId: String,
208                                                     startMillis: Long,
209                                                     stopMillis: Long): List[ResourceEvent] = {
210     // FIXME: Implement
211     Nil
212   }
213   //-ResourceEventStore
214
215   //+ UserStateStore
216   def insertUserState(userState: UserState) = {
217     MongoDBStore.insertUserState(
218       userState.copy(_id = new ObjectId().toString),
219       userStates,
220       MongoDBStore.jsonSupportToDBObject
221     )
222   }
223
224   def findUserStateByUserID(userID: String): Option[UserState] = {
225     val query = new BasicDBObject(UserStateJsonNames.userID, userID)
226     val cursor = userStates find query
227
228     MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
229   }
230
231   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo): Option[UserState] = {
232     val query = new BasicDBObjectBuilder().
233       add(UserState.JsonNames.userID, userID).
234       add(UserState.JsonNames.isFullBillingMonthState, true).
235       add(UserState.JsonNames.theFullBillingMonth_year, bmi.year).
236       add(UserState.JsonNames.theFullBillingMonth_month, bmi.month).
237       get()
238
239     logger.debug("findLatestUserStateForFullMonthBilling(%s, %s) query: %s".format(userID, bmi, query))
240
241     // Descending order, so that the latest comes first
242     val sorter = new BasicDBObject(UserState.JsonNames.occurredMillis, -1)
243
244     val cursor = userStates.find(query).sort(sorter)
245
246     MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
247   }
248   //- UserStateStore
249
250   //+IMEventStore
251   def createIMEventFromJson(json: String) = {
252     MongoDBStore.createIMEventFromJson(json)
253   }
254
255   def createIMEventFromOther(event: IMEventModel) = {
256     MongoDBStore.createIMEventFromOther(event)
257   }
258
259   def pingIMEventStore(): Unit = {
260     MongoDBStore.ping(mongo)
261   }
262
263   def insertIMEvent(event: IMEventModel): IMEvent = {
264     val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId().toStringMongod)
265     MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject)
266     localEvent
267   }
268
269   def findIMEventById(id: String): Option[IMEvent] = {
270     MongoDBStore.findBy(IMEventNames.id, id, imEvents, MongoDBIMEvent.fromDBObject)
271   }
272
273
274   /**
275    * Find the `CREATE` even for the given user. Note that there must be only one such event.
276    */
277   def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
278     val query = new BasicDBObjectBuilder().
279       add(IMEventNames.userID, userID).
280       add(IMEventNames.eventType, IMEventModel.EventTypeNames.create).get()
281
282     // Normally one such event is allowed ...
283     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
284
285     MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
286   }
287
288   def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
289     val query = new BasicDBObject(IMEventNames.userID, userID)
290     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, -1))
291
292     MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
293   }
294
295   /**
296    * Find the very first activation event for a particular user.
297    *
298    */
299   def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent] = {
300     val query = new BasicDBObjectBuilder().
301       add(IMEventNames.userID, userID).
302       add(IMEventNames.isActive, true).get()
303
304     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
305
306     withCloseable(cursor) { cursor ⇒
307       if(cursor.hasNext) {
308         Some(MongoDBIMEvent.fromDBObject(cursor.next()))
309       } else {
310         None
311       }
312    }
313   }
314
315   /**
316    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
317    * the given function `f`.
318    *
319    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
320    */
321   def replayIMEventsInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
322     val query = new BasicDBObject(IMEventNames.userID, userID)
323     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
324
325     withCloseable(cursor) { cursor ⇒
326       while(cursor.hasNext) {
327         val model = MongoDBIMEvent.fromDBObject(cursor.next())
328         f(model)
329       }
330     }
331   }
332   //-IMEventStore
333
334
335
336   //+PolicyStore
337   def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = {
338     val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom,
339       new BasicDBObject("$gt", after))
340     MongoDBStore.runQuery(query, policyEntries)(MongoDBStore.dbObjectToPolicyEntry)(Some(_sortByTimestampAsc))
341   }
342
343   def storePolicyEntry(policy: PolicyEntry): Maybe[RecordID] = MongoDBStore.storePolicyEntry(policy, policyEntries)
344
345
346   def updatePolicyEntry(policy: PolicyEntry) = {
347     //Find the entry
348     val query = new BasicDBObject(PolicyEntry.JsonNames.id, policy.id)
349     val policyObject = MongoDBStore.jsonSupportToDBObject(policy)
350     policyEntries.update(query, policyObject, true, false)
351   }
352   
353   def findPolicyEntry(id: String) = {
354     MongoDBStore.findBy(PolicyJsonNames.id, id, policyEntries, MongoDBStore.dbObjectToPolicyEntry)
355   }
356
357   //-PolicyStore
358 }
359
360 object MongoDBStore {
361   object JsonNames {
362     final val _id = "_id"
363   }
364
365   /**
366    * Collection holding the [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]s.
367    *
368    * Resource events are coming from all systems handling billable resources.
369    */
370   final val RESOURCE_EVENTS_COLLECTION = "resevents"
371
372   /**
373    * Collection holding the snapshots of [[gr.grnet.aquarium.computation.state.UserState]].
374    *
375    * [[gr.grnet.aquarium.computation.state.UserState]] is held internally within
376    * [[gr.grnet.aquarium.actor.service.user .UserActor]]s.
377    */
378   final val USER_STATES_COLLECTION = "userstates"
379
380   /**
381    * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
382    *
383    * User events are coming from the IM module (external).
384    */
385   final val IM_EVENTS_COLLECTION = "imevents"
386
387   /**
388    * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s that could not be parsed to normal objects.
389    *
390    * We of course assume at least a valid JSON representation.
391    *
392    * User events are coming from the IM module (external).
393    */
394   final val UNPARSED_IM_EVENTS_COLLECTION = "unparsed_imevents"
395
396   /**
397    * Collection holding [[gr.grnet.aquarium.logic.accounting.dsl.DSLPolicy]].
398    */
399 //  final val POLICIES_COLLECTION = "policies"
400
401   /**
402    * Collection holding [[gr.grnet.aquarium.event.model.PolicyEntry]].
403    */
404   final val POLICY_ENTRIES_COLLECTION = "policyEntries"
405
406   def dbObjectToUserState(dbObj: DBObject): UserState = {
407     UserState.fromJson(JSON.serialize(dbObj))
408   }
409
410   def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
411     withCloseable(cursor) { cursor ⇒
412       if(cursor.hasNext) {
413         Some(f(cursor.next()))
414       } else {
415         None
416       }
417     }
418   }
419
420   def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
421     PolicyEntry.fromJson(JSON.serialize(dbObj))
422   }
423
424   def ping(mongo: Mongo): Unit = synchronized {
425     // This requires a network roundtrip
426     mongo.isLocked
427   }
428
429   def findBy[A >: Null <: AnyRef](name: String,
430                                   value: String,
431                                   collection: DBCollection,
432                                   deserializer: (DBObject) => A) : Option[A] = {
433     val query = new BasicDBObject(name, value)
434     val cursor = collection find query
435
436     withCloseable(cursor) { cursor ⇒
437       if(cursor.hasNext)
438         Some(deserializer apply cursor.next)
439       else
440         None
441     }
442   }
443
444   def runQuery[A <: ExternalEventModel](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
445                                   (deserializer: (DBObject) => A)
446                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
447     val cursor0 = collection find query
448     val cursor = if(orderBy ne null) {
449       cursor0 sort orderBy
450     } else {
451       cursor0
452     } // I really know that docs say that it is the same cursor.
453
454     if(!cursor.hasNext) {
455       cursor.close()
456       Nil
457     } else {
458       val buff = new ListBuffer[A]()
459
460       while(cursor.hasNext) {
461         buff += deserializer apply cursor.next
462       }
463
464       cursor.close()
465
466       sortWith match {
467         case Some(sorter) => buff.toList.sortWith(sorter)
468         case None => buff.toList
469       }
470     }
471   }
472
473   def storeUserState(userState: UserState, collection: DBCollection) = {
474     storeAny[UserState](userState, collection, ResourceEventNames.userID, _.userID, MongoDBStore.jsonSupportToDBObject)
475   }
476   
477   def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = {
478     Maybe(storeAny[PolicyEntry](policyEntry, collection, PolicyJsonNames.id, _.id, MongoDBStore.jsonSupportToDBObject))
479   }
480
481   def storeAny[A](any: A,
482                   collection: DBCollection,
483                   idName: String,
484                   idValueProvider: (A) => String,
485                   serializer: (A) => DBObject) : RecordID = {
486
487     val dbObject = serializer apply any
488     val _id = new ObjectId()
489     dbObject.put("_id", _id)
490     val writeResult = collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
491     writeResult.getLastError().throwOnError()
492
493     RecordID(dbObject.get("_id").toString)
494   }
495
496   // FIXME: consolidate
497   def insertUserState[A <: UserState](obj: A, collection: DBCollection, serializer: A ⇒ DBObject) = {
498     val dbObject = serializer apply obj
499     val objectId = obj._id  match {
500       case null ⇒
501         val _id = new ObjectId()
502         dbObject.put("_id", _id)
503         _id
504
505       case _id ⇒
506         _id
507     }
508
509     dbObject.put(JsonNames._id, objectId)
510
511     collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
512
513     obj
514   }
515
516   def insertObject[A <: MongoDBEventModel](obj: A, collection: DBCollection, serializer: (A) => DBObject) : Unit = {
517     val dbObject = serializer apply obj
518     val objectId = obj._id  match {
519       case null ⇒
520         val _id = new ObjectId()
521         dbObject.put("_id", _id)
522         _id
523
524       case _id ⇒
525         _id
526     }
527
528     dbObject.put(JsonNames._id, objectId)
529
530     collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
531   }
532
533   def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
534     Conversions.jsonSupportToDBObject(jsonSupport)
535   }
536
537   final def isLocalIMEvent(event: IMEventModel) = event match {
538     case _: MongoDBIMEvent ⇒ true
539     case _ ⇒ false
540   }
541
542   final def createIMEventFromJson(json: String) = {
543     MongoDBIMEvent.fromJsonString(json)
544   }
545
546   final def createIMEventFromOther(event: IMEventModel) = {
547     MongoDBIMEvent.fromOther(event, null)
548   }
549
550   final def createIMEventFromJsonBytes(jsonBytes: Array[Byte]) = {
551     MongoDBIMEvent.fromJsonBytes(jsonBytes)
552   }
553 }