Merge branch 'snapshots'
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import collection.immutable
39 import com.mongodb._
40 import gr.grnet.aquarium.computation.BillingMonthInfo
41 import gr.grnet.aquarium.converter.StdConverters
42 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
43 import gr.grnet.aquarium.message.MessageConstants
44 import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg}
45 import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, OrderingHelpers, AvroHelpers}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.util._
48 import gr.grnet.aquarium.util.Once
49 import gr.grnet.aquarium.util.json.JsonSupport
50 import gr.grnet.aquarium.{Aquarium, AquariumException}
51 import org.apache.avro.specific.SpecificRecord
52 import org.bson.types.ObjectId
53
54 /**
55  * Mongodb implementation of the various aquarium stores.
56  *
57  * @author Christos KK Loverdos <loverdos@gmail.com>
58  * @author Georgios Gousios <gousiosg@gmail.com>
59  * @author Prodromos Gerakios <pgerakio@grnet.gr>
60  */
61 class MongoDBStore(
62     val aquarium: Aquarium,
63     val mongo: Mongo,
64     val database: String,
65     val username: String,
66     val password: String)
67   extends ResourceEventStore
68   with UserStateStore
69   with IMEventStore
70   with PolicyStore
71   with Loggable {
72
73   private[store] lazy val resourceEvents = getCollection(MongoDBStore.ResourceEventCollection)
74   private[store] lazy val userStates = getCollection(MongoDBStore.UserStateCollection)
75   private[store] lazy val imEvents = getCollection(MongoDBStore.IMEventCollection)
76   private[store] lazy val policies = getCollection(MongoDBStore.PolicyCollection)
77
78   private[store] lazy val indicesMap = {
79    val resev=  new BasicDBObjectBuilder().
80                       add(MongoDBStore.JsonNames.id,1).
81                       add(MongoDBStore.JsonNames.userID,1).
82                       add(MongoDBStore.JsonNames.occurredMillis,1).
83                       add(MongoDBStore.JsonNames.receivedMillis,1).get
84    val imev =  new BasicDBObjectBuilder().
85                  add(MongoDBStore.JsonNames.userID,1).
86                  add(MongoDBStore.JsonNames.eventType,"").
87                  add(MongoDBStore.JsonNames.occurredMillis,1).get
88    val policy = new BasicDBObjectBuilder().
89                  add("validFromMillis",1).
90                  add("validToMillis",1).get
91    val user = new BasicDBObjectBuilder().
92               add( "occurredMillis",1).
93               add("isFullBillingMonth",false).
94               add("billingYear",1).
95               add("billingMonth",1).
96               add("billingMonthDay",1).get
97     Map(MongoDBStore.ResourceEventCollection -> resev,
98         MongoDBStore.IMEventCollection-> imev,
99         MongoDBStore.PolicyCollection-> policy,
100         MongoDBStore.UserStateCollection-> user
101        )
102   }
103   private[this] val once = new Once()
104
105   private[this] def doAuthenticate(db: DB) {
106     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
107       throw new AquariumException("Could not authenticate user %s".format(username))
108     }
109   }
110
111   private[this] def getCollection(name: String): DBCollection = {
112     val db = mongo.getDB(database)
113     doAuthenticate(db)
114     once.run { /* this code is thread-safe and will run exactly once*/
115       indicesMap.foreach { case (collection,obj) =>
116         mongo.getDB(database).getCollection(collection).createIndex(obj)
117       }
118     }
119     db.getCollection(name)
120   }
121
122   //+ResourceEventStore
123   def pingResourceEventStore(): Unit = synchronized {
124     getCollection(MongoDBStore.ResourceEventCollection)
125     MongoDBStore.ping(mongo)
126   }
127
128   def insertResourceEvent(event: ResourceEventMsg) = {
129     val mongoID = new ObjectId()
130     event.setInStoreID(mongoID.toStringMongod)
131
132     val dbObject = new BasicDBObjectBuilder().
133       add(MongoDBStore.JsonNames._id, mongoID).
134       add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
135       add(MongoDBStore.JsonNames.userID, event.getUserID).
136       add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
137       add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis).
138     get()
139
140     MongoDBStore.insertDBObject(dbObject, resourceEvents)
141     event
142   }
143
144   def findResourceEventByID(id: String): Option[ResourceEventMsg] = {
145     val dbObjectOpt = MongoDBStore.findOneByAttribute(resourceEvents, MongoDBStore.JsonNames.id, id)
146     for {
147       dbObject ← dbObjectOpt
148       payload = dbObject.get(MongoDBStore.JsonNames.payload)
149       msg = AvroHelpers.specificRecordOfBytes(payload.asInstanceOf[Array[Byte]], new ResourceEventMsg)
150     } yield msg
151   }
152
153   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
154     val query = new BasicDBObjectBuilder().
155       add(MongoDBStore.JsonNames.userID, userID).
156       // received within the period
157       add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$gte", startMillis)).
158       add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$lte", stopMillis)).
159       // occurred outside the period
160       add("$or", {
161         val dbList = new BasicDBList()
162         dbList.add(0, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lt", startMillis)))
163         dbList.add(1, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gt", stopMillis)))
164         dbList
165       }).
166       get()
167
168     resourceEvents.count(query)
169   }
170
171   def foreachResourceEventOccurredInPeriod(
172       userID: String,
173       startMillis: Long,
174       stopMillis: Long
175   )(f: ResourceEventMsg ⇒ Unit): Long = {
176     var _counter= 0L
177     val query = new BasicDBObjectBuilder().
178       add(MongoDBStore.JsonNames.userID, userID).
179       add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gte", startMillis)).
180       add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lte", stopMillis)).
181       get()
182
183     val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1)
184     val cursor = resourceEvents.find(query).sort(sorter)
185
186     withCloseable(cursor) { cursor ⇒
187       while(cursor.hasNext) {
188         val nextDBObject = cursor.next()
189         val payload = nextDBObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
190         val nextEvent = AvroHelpers.specificRecordOfBytes(payload, new ResourceEventMsg)
191
192         f(nextEvent)
193         _counter += 1
194       }
195     }
196
197     _counter
198   }
199   //-ResourceEventStore
200
201   //+ UserStateStore
202   def findUserStateByUserID(userID: String) = {
203     val dbObjectOpt = MongoDBStore.findOneByAttribute(userStates, MongoDBStore.JsonNames.userID, userID)
204     for {
205       dbObject <- dbObjectOpt
206       payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
207       msg = AvroHelpers.specificRecordOfBytes(payload, new UserStateMsg)
208     } yield {
209       msg
210     }
211   }
212
213   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo) = {
214     val query = new BasicDBObjectBuilder().
215       add(MongoDBStore.JsonNames.userID, userID).
216       add(MongoDBStore.JsonNames.isForFullMonth, true).
217       add(MongoDBStore.JsonNames.billingYear, bmi.year).
218       add(MongoDBStore.JsonNames.billingMonth, bmi.month).
219       get()
220
221     // Descending order, so that the latest comes first
222     val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1)
223
224     val cursor = userStates.find(query).sort(sorter)
225
226     withCloseable(cursor) { cursor ⇒
227       MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg)
228     }
229   }
230
231   def findLatestUserState(userID: String) = {
232     val query = new BasicDBObjectBuilder().
233       add(MongoDBStore.JsonNames.userID, userID).
234       get()
235
236     // Descending order, so that the latest comes first
237     val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1)
238
239     val cursor = userStates.find(query).sort(sorter)
240
241     withCloseable(cursor) { cursor ⇒
242       MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg)
243     }
244   }
245
246   /**
247    * Stores a user state.
248    */
249   def insertUserState(event: UserStateMsg)= {
250     val mongoID = new ObjectId()
251     event.setInStoreID(mongoID.toStringMongod)
252
253     val dbObject = new BasicDBObjectBuilder().
254       add(MongoDBStore.JsonNames._id, mongoID).
255       add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
256       add(MongoDBStore.JsonNames.userID, event.getUserID).
257       add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
258       add(MongoDBStore.JsonNames.isForFullMonth, event.getIsForFullMonth).
259       add(MongoDBStore.JsonNames.billingYear, event.getBillingYear).
260       add(MongoDBStore.JsonNames.billingMonth, event.getBillingMonth).
261       add(MongoDBStore.JsonNames.billingMonthDay, event.getBillingMonthDay).
262     get()
263
264     MongoDBStore.insertDBObject(dbObject, userStates)
265     event
266   }
267   //- UserStateStore
268
269   //+IMEventStore
270   def pingIMEventStore(): Unit = {
271     getCollection(MongoDBStore.IMEventCollection)
272     MongoDBStore.ping(mongo)
273   }
274
275   def insertIMEvent(event: IMEventMsg) = {
276     val mongoID = new ObjectId()
277     event.setInStoreID(mongoID.toStringMongod)
278
279     val dbObject = new BasicDBObjectBuilder().
280       add(MongoDBStore.JsonNames._id, mongoID).
281       add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
282       add(MongoDBStore.JsonNames.userID, event.getUserID).
283       add(MongoDBStore.JsonNames.eventType, event.getEventType().toLowerCase).
284       add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
285       add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis).
286     get()
287
288     MongoDBStore.insertDBObject(dbObject, imEvents)
289     event
290   }
291
292   def findIMEventByID(id: String) = {
293     val dbObjectOpt = MongoDBStore.findOneByAttribute(imEvents, MongoDBStore.JsonNames.id, id)
294     for {
295       dbObject ← dbObjectOpt
296       payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
297       msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
298     } yield {
299       msg
300     }
301   }
302
303
304   /**
305    * Find the `CREATE` even for the given user. Note that there must be only one such event.
306    */
307   def findCreateIMEventByUserID(userID: String) = {
308     val query = new BasicDBObjectBuilder().
309       add(MongoDBStore.JsonNames.userID, userID).
310       add(MongoDBStore.JsonNames.eventType, MessageConstants.IMEventMsg.EventTypes.create).get()
311
312     // Normally one such event is allowed ...
313     val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1))
314
315     val dbObjectOpt = withCloseable(cursor) { cursor ⇒
316       if(cursor.hasNext) {
317         Some(cursor.next())
318       } else {
319         None
320       }
321     }
322
323     for {
324       dbObject <- dbObjectOpt
325       payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
326       msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
327     } yield {
328       msg
329     }
330   }
331
332   /**
333    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
334    * the given function `f`.
335    *
336    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
337    */
338   def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = {
339     val query = new BasicDBObject(MongoDBStore.JsonNames.userID, userID)
340     val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1))
341
342     var _shouldContinue = true
343     withCloseable(cursor) { cursor ⇒
344       while(_shouldContinue && cursor.hasNext) {
345         val dbObject = cursor.next()
346         val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
347         val msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
348
349         _shouldContinue = f(msg)
350       }
351     }
352
353     _shouldContinue
354   }
355   //-IMEventStore
356
357   //+PolicyStore
358   def foreachPolicy[U](f: PolicyMsg ⇒ U) {
359     val cursor = policies.find()
360     withCloseable(cursor) { cursor ⇒
361       while(cursor.hasNext) {
362         val dbObject = cursor.next()
363         val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
364         val policy = AvroHelpers.specificRecordOfBytes(payload, new PolicyMsg)
365         f(policy)
366       }
367     }
368   }
369
370   def insertPolicy(policy: PolicyMsg): PolicyMsg = {
371     val mongoID = new ObjectId()
372     policy.setInStoreID(mongoID.toStringMongod)
373     val dbObject = new BasicDBObjectBuilder().
374       add(MongoDBStore.JsonNames._id, mongoID).
375       add(MongoDBStore.JsonNames.validFromMillis, policy.getValidFromMillis).
376       add(MongoDBStore.JsonNames.validToMillis, policy.getValidToMillis).
377       add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(policy)).
378     get()
379
380     MongoDBStore.insertDBObject(dbObject, policies)
381     policy
382   }
383
384   def loadPolicyAt(atMillis: Long): Option[PolicyMsg] = {
385     // FIXME Inefficient
386     var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering)
387     foreachPolicy(_policies += _)
388     _policies.to(MessageFactory.newDummyPolicyMsgAt(atMillis)).lastOption
389   }
390
391   def loadSortedPoliciesWithin(fromMillis: Long, toMillis: Long): immutable.SortedMap[Timeslot, PolicyMsg] = {
392     // FIXME Inefficient
393     var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering)
394     foreachPolicy(_policies += _)
395
396     immutable.SortedMap(_policies.
397       from(MessageFactory.newDummyPolicyMsgAt(fromMillis)).
398       to(MessageFactory.newDummyPolicyMsgAt(toMillis)).toSeq.
399       map(p ⇒ (Timeslot(p.getValidFromMillis, p.getValidToMillis), p)): _*
400     )
401   }
402   //-PolicyStore
403 }
404
405 object MongoDBStore {
406   final val JsonNames = gr.grnet.aquarium.util.json.JsonNames
407
408   final val collections = List("resevents","userstates","imevents","policies")
409
410   final val ResourceEventCollection = collections(0)
411
412   final val UserStateCollection = collections(1)
413
414   final val IMEventCollection = collections(2)
415
416   final val PolicyCollection = collections(3)
417
418   def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
419     withCloseable(cursor) { cursor ⇒
420       if(cursor.hasNext) {
421         Some(f(cursor.next()))
422       } else {
423         None
424       }
425     }
426   }
427
428   def ping(mongo: Mongo): Unit = synchronized {
429     // This requires a network roundtrip
430     mongo.isLocked
431   }
432
433   def findOneByAttribute(
434       collection: DBCollection,
435       attributeName: String,
436       attributeValue: String,
437       sortByOpt: Option[DBObject] = None
438   ): Option[DBObject] =  {
439     val query = new BasicDBObject(attributeName, attributeValue)
440     val cursor = sortByOpt match {
441       case None         ⇒ collection find query
442       case Some(sortBy) ⇒ collection find query sort sortBy
443     }
444     withCloseable(cursor) { cursor ⇒
445       if(cursor.hasNext) Some(cursor.next()) else None
446     }
447   }
448
449   def insertDBObject(dbObj: DBObject, collection: DBCollection) {
450     collection.insert(dbObj, WriteConcern.JOURNAL_SAFE)
451   }
452
453   def findNextPayloadRecord[R <: SpecificRecord](cursor: DBCursor, fresh: R): Option[R] = {
454     for {
455       dbObject <- if(cursor.hasNext) Some(cursor.next()) else None
456       payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
457       msg = AvroHelpers.specificRecordOfBytes(payload, fresh)
458     } yield {
459       msg
460     }
461   }
462
463   def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
464     StdConverters.AllConverters.convertEx[DBObject](jsonSupport)
465   }
466 }